Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,35 @@
import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.*;
import org.apache.spark.sql.connector.catalog.*;
import org.apache.iceberg.spark.SupportsReplaceView;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.connector.catalog.ViewCatalog;
import org.apache.spark.sql.connector.catalog.ViewChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkCatalog implements TableCatalog, SupportsNamespaces, ViewCatalog {
public class SparkCatalog
implements StagingTableCatalog,
TableCatalog,
SupportsNamespaces,
ViewCatalog,
SupportsReplaceView {

private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private String catalogName = null;
private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
Expand All @@ -43,6 +64,8 @@ public String name() {
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
this.icebergsSparkCatalog.initialize(name, options);
}

@Override
Expand Down Expand Up @@ -73,58 +96,88 @@ public void renameTable(Identifier from, Identifier to)
throw new UnsupportedOperationException("renameTable");
}

@Override
public void invalidateTable(Identifier ident) {
throw new UnsupportedOperationException("invalidateTable");
}

@Override
public boolean purgeTable(Identifier ident) {
throw new UnsupportedOperationException("purgeTable");
}

@Override
public Identifier[] listTables(String[] namespace) {
throw new UnsupportedOperationException("listTables");
}

@Override
public StagedTable stageCreate(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
throws TableAlreadyExistsException {
return this.icebergsSparkCatalog.stageCreate(ident, schema, transforms, properties);
}

@Override
public StagedTable stageReplace(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
throws NoSuchTableException {
return this.icebergsSparkCatalog.stageReplace(ident, schema, transforms, properties);
}

@Override
public StagedTable stageCreateOrReplace(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties) {
return this.icebergsSparkCatalog.stageCreateOrReplace(ident, schema, transforms, properties);
}

@Override
public String[] defaultNamespace() {
throw new UnsupportedOperationException("defaultNamespace");
return this.icebergsSparkCatalog.defaultNamespace();
}

@Override
public String[][] listNamespaces() {
throw new UnsupportedOperationException("listNamespaces");
return this.icebergsSparkCatalog.listNamespaces();
}

@Override
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
throw new UnsupportedOperationException("listNamespaces");
return this.icebergsSparkCatalog.listNamespaces(namespace);
}

@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
throw new UnsupportedOperationException("loadNamespaceMetadata");
return this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
throw new UnsupportedOperationException("createNamespace");
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
throw new UnsupportedOperationException("alterNamespace");
this.icebergsSparkCatalog.alterNamespace(namespace, changes);
}

@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
throw new UnsupportedOperationException("dropNamespace");
return this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
}

@Override
public Identifier[] listViews(String... namespace) {
throw new UnsupportedOperationException("listViews");
return this.icebergsSparkCatalog.listViews(namespace);
}

@Override
public View loadView(Identifier ident) throws NoSuchViewException {
throw new UnsupportedOperationException("loadView");
return this.icebergsSparkCatalog.loadView(ident);
}

@Override
Expand All @@ -139,23 +192,56 @@ public View createView(
String[] columnComments,
Map<String, String> properties)
throws ViewAlreadyExistsException, NoSuchNamespaceException {
throw new UnsupportedOperationException("createView");
return this.icebergsSparkCatalog.createView(
ident,
sql,
currentCatalog,
currentNamespace,
schema,
queryColumnNames,
columnAliases,
columnComments,
properties);
}

@Override
public View alterView(Identifier ident, ViewChange... changes)
throws NoSuchViewException, IllegalArgumentException {
throw new UnsupportedOperationException("alterView");
return this.icebergsSparkCatalog.alterView(ident, changes);
}

@Override
public boolean dropView(Identifier ident) {
throw new UnsupportedOperationException("dropView");
return this.icebergsSparkCatalog.dropView(ident);
}

@Override
public void renameView(Identifier fromIdentifier, Identifier toIdentifier)
throws NoSuchViewException, ViewAlreadyExistsException {
throw new UnsupportedOperationException("renameView");
this.icebergsSparkCatalog.renameView(fromIdentifier, toIdentifier);
}

@Override
public View replaceView(
Identifier ident,
String sql,
String currentCatalog,
String[] currentNamespace,
StructType schema,
String[] queryColumnNames,
String[] columnAliases,
String[] columnComments,
Map<String, String> properties)
throws NoSuchNamespaceException, NoSuchViewException {
return this.icebergsSparkCatalog.replaceView(
ident,
sql,
currentCatalog,
currentNamespace,
schema,
queryColumnNames,
columnAliases,
columnComments,
properties);
}
}
Loading