Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.polaris.spark;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -31,6 +33,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.rest.Endpoint;
import org.apache.iceberg.rest.ErrorHandlers;
Expand All @@ -39,7 +42,9 @@
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.polaris.core.rest.PolarisEndpoints;
import org.apache.polaris.core.rest.PolarisResourcePaths;
import org.apache.polaris.service.types.GenericTable;
Expand All @@ -52,13 +57,16 @@
* objects.
*/
public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
public static final String REST_PAGE_SIZE = "rest-page-size";

private final Function<Map<String, String>, RESTClient> clientBuilder;

private RESTClient restClient = null;
private CloseableGroup closeables = null;
private Set<Endpoint> endpoints;
private OAuth2Util.AuthSession catalogAuth = null;
private PolarisResourcePaths pathGenerator = null;
private Integer pageSize = null;

// the default endpoints to config if server doesn't specify the 'endpoints' configuration.
private static final Set<Endpoint> DEFAULT_ENDPOINTS = PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
Expand Down Expand Up @@ -101,6 +109,12 @@ public void initialize(Map<String, String> unresolved, OAuth2Util.AuthSession ca
this.pathGenerator = PolarisResourcePaths.forCatalogProperties(mergedProps);
this.restClient = clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);

this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, REST_PAGE_SIZE);
if (pageSize != null) {
Preconditions.checkArgument(
pageSize > 0, "Invalid value for %s, must be a positive integer", REST_PAGE_SIZE);
}

this.closeables = new CloseableGroup();
this.closeables.addCloseable(this.restClient);
this.closeables.setSuppressCloseFailure(true);
Expand Down Expand Up @@ -138,12 +152,49 @@ public void close() throws IOException {

@Override
public List<TableIdentifier> listGenericTables(Namespace ns) {
throw new UnsupportedOperationException("listTables not supported");
Endpoint.check(endpoints, PolarisEndpoints.V1_LIST_GENERIC_TABLES);

Map<String, String> queryParams = Maps.newHashMap();
ImmutableList.Builder<TableIdentifier> tables = ImmutableList.builder();
String pageToken = "";
if (pageSize != null) {
queryParams.put("pageSize", String.valueOf(pageSize));
}

do {
queryParams.put("pageToken", pageToken);
ListTablesResponse response =
restClient
.withAuthSession(this.catalogAuth)
.get(
pathGenerator.genericTables(ns),
queryParams,
ListTablesResponse.class,
Map.of(),
ErrorHandlers.namespaceErrorHandler());
pageToken = response.nextPageToken();
tables.addAll(response.identifiers());
} while (pageToken != null);

return tables.build();
}

@Override
public boolean dropGenericTable(TableIdentifier identifier) {
throw new UnsupportedOperationException("dropTable not supported");
Endpoint.check(endpoints, PolarisEndpoints.V1_DELETE_GENERIC_TABLE);

try {
restClient
.withAuthSession(this.catalogAuth)
.delete(
pathGenerator.genericTable(identifier),
null,
Map.of(),
ErrorHandlers.tableErrorHandler());
return true;
} catch (NoSuchTableException e) {
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.polaris.spark;

import java.util.Map;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.polaris.service.types.GenericTable;
Expand Down Expand Up @@ -90,12 +91,19 @@ public Table createTable(
@Override
public Table alterTable(Identifier identifier, TableChange... changes)
throws NoSuchTableException {
throw new NoSuchTableException(identifier);
// alterTable currently is not supported for generic tables
throw new UnsupportedOperationException("alterTable operation is not supported");
}

@Override
public boolean purgeTable(Identifier ident) {
// purgeTable for generic table will only do a drop without purge
return dropTable(ident);
}

@Override
public boolean dropTable(Identifier identifier) {
return false;
return this.polarisCatalog.dropGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
}

@Override
Expand All @@ -106,6 +114,12 @@ public void renameTable(Identifier from, Identifier to)

@Override
public Identifier[] listTables(String[] namespace) {
throw new UnsupportedOperationException("listTables operation is not supported");
try {
return this.polarisCatalog.listGenericTables(Namespace.of(namespace)).stream()
.map(ident -> Identifier.of(ident.namespace().levels(), ident.name()))
.toArray(Identifier[]::new);
} catch (UnsupportedOperationException ex) {
return new Identifier[0];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
Expand Down Expand Up @@ -161,33 +163,59 @@ public Table createTable(

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
throw new UnsupportedOperationException("alterTable");
try {
return this.icebergsSparkCatalog.alterTable(ident, changes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that every time we update a generic table, we need a rest call to Polaris. I guess we are working on something to improve on this. That should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry, i think the approach that I mentioned about include table format in the list tables will not address your concern here. To improve this, i think in the future, we can introduce caching at client side with table format, so if we already know the format, we will not need to do extra calls.

} catch (NoSuchTableException e) {
Table table = this.polarisSparkCatalog.loadTable(ident);
String provider = table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
if (PolarisCatalogUtils.useDelta(provider)) {
// For delta table, most of the alter operations is a delta log manipulation,
// we load the delta catalog to help handling the alter table operation.
// NOTE: This currently doesn't work for changing file location and file format
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.alterTable(ident, changes);
}
return this.polarisSparkCatalog.alterTable(ident);
}
}

@Override
public boolean dropTable(Identifier ident) {
throw new UnsupportedOperationException("dropTable");
return this.icebergsSparkCatalog.dropTable(ident) || this.polarisSparkCatalog.dropTable(ident);
}

@Override
public void renameTable(Identifier from, Identifier to)
throws NoSuchTableException, TableAlreadyExistsException {
throw new UnsupportedOperationException("renameTable");
try {
this.icebergsSparkCatalog.renameTable(from, to);
} catch (NoSuchTableException e) {
this.polarisSparkCatalog.renameTable(from, to);
}
}

@Override
public void invalidateTable(Identifier ident) {
throw new UnsupportedOperationException("invalidateTable");
this.icebergsSparkCatalog.invalidateTable(ident);
}

@Override
public boolean purgeTable(Identifier ident) {
throw new UnsupportedOperationException("purgeTable");
if (this.icebergsSparkCatalog.purgeTable(ident)) {
return true;
} else {
return this.polarisSparkCatalog.purgeTable(ident);
}
}

@Override
public Identifier[] listTables(String[] namespace) {
throw new UnsupportedOperationException("listTables");
Identifier[] icebergIdents = this.icebergsSparkCatalog.listTables(namespace);
Identifier[] genericTableIdents = this.polarisSparkCatalog.listTables(namespace);

return Stream.concat(Arrays.stream(icebergIdents), Arrays.stream(genericTableIdents))
.toArray(Identifier[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark.rest;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Set;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTResponse;
import org.apache.polaris.service.types.ListGenericTablesResponse;

/**
* RESTResponse definition for ListGenericTable which extends the iceberg RESTResponse. This is
* currently required because the Iceberg HTTPClient requires the request and response to be a class
* of RESTRequest and RESTResponse.
*/
public class ListGenericTablesRESTResponse extends ListGenericTablesResponse
implements RESTResponse {

@JsonCreator
public ListGenericTablesRESTResponse(
@JsonProperty(value = "next-page-token") String nextPageToken,
@JsonProperty(value = "identifiers") Set<TableIdentifier> identifiers) {
super(nextPageToken, identifiers);
}

@Override
public void validate() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
*/
package org.apache.polaris.spark;

import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableChange;

/**
* This is a fake delta catalog class that is used for testing. This class is a noop class that
Expand All @@ -29,4 +33,9 @@ public class NoopDeltaCatalog extends DelegatingCatalogExtension {
// This is a mock of isUnityCatalog scala val in
// org.apache.spark.sql.delta.catalog.DeltaCatalog.
private boolean isUnityCatalog = false;

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
return super.loadTable(ident);
}
}
Loading