From cee5f607f202f60a6fea20762d08a425fc1e0c5f Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Mon, 12 May 2025 20:59:52 -0700 Subject: [PATCH 1/9] compiles --- .../catalog/iceberg/CatalogHandlerUtils.java | 599 ++++++++++++++++++ 1 file changed, 599 insertions(+) create mode 100644 service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java new file mode 100644 index 0000000000..8784d09e91 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetadataTable; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.CreateViewRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.requests.RenameTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.CreateNamespaceResponse; +import org.apache.iceberg.rest.responses.GetNamespaceResponse; +import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.SQLViewRepresentation; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewBuilder; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewOperations; +import org.apache.iceberg.view.ViewRepresentation; + +/** + * CODE_COPIED_TO_POLARIS + * Copied from CatalogHandler in Iceberg 1.8.0 + * Contains a collection of utilities related to managing Iceberg entities + */ +public class CatalogHandlerUtils { + private static final Schema EMPTY_SCHEMA = new Schema(); + private static final String INITIAL_PAGE_TOKEN = ""; + + private CatalogHandlerUtils() {} + + /** + * Exception used to avoid retrying commits when assertions fail. + * + *

When a REST assertion fails, it will throw CommitFailedException to send back to the client. + * But the assertion checks happen in the block that is retried if {@link + * TableOperations#commit(TableMetadata, TableMetadata)} throws CommitFailedException. This is + * used to avoid retries for assertion failures, which are unwrapped and rethrown outside of the + * commit loop. + */ + private static class ValidationFailureException extends RuntimeException { + private final CommitFailedException wrapped; + + private ValidationFailureException(CommitFailedException cause) { + super(cause); + this.wrapped = cause; + } + + public CommitFailedException wrapped() { + return wrapped; + } + } + + private static Pair, String> paginate(List list, String pageToken, int pageSize) { + int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken); + if (pageStart >= list.size()) { + return Pair.of(Collections.emptyList(), null); + } + + int end = Math.min(pageStart + pageSize, list.size()); + List subList = list.subList(pageStart, end); + String nextPageToken = end >= list.size() ? null : String.valueOf(end); + + return Pair.of(subList, nextPageToken); + } + + public static ListNamespacesResponse listNamespaces( + SupportsNamespaces catalog, Namespace parent) { + List results; + if (parent.isEmpty()) { + results = catalog.listNamespaces(); + } else { + results = catalog.listNamespaces(parent); + } + + return ListNamespacesResponse.builder().addAll(results).build(); + } + + public static ListNamespacesResponse listNamespaces( + SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) { + List results; + + if (parent.isEmpty()) { + results = catalog.listNamespaces(); + } else { + results = catalog.listNamespaces(parent); + } + + Pair, String> page = paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListNamespacesResponse.builder() + .addAll(page.first()) + .nextPageToken(page.second()) + .build(); + } + + public static CreateNamespaceResponse createNamespace( + SupportsNamespaces catalog, CreateNamespaceRequest request) { + Namespace namespace = request.namespace(); + catalog.createNamespace(namespace, request.properties()); + return CreateNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(catalog.loadNamespaceMetadata(namespace)) + .build(); + } + + public static void namespaceExists(SupportsNamespaces catalog, Namespace namespace) { + if (!catalog.namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } + + public static GetNamespaceResponse loadNamespace( + SupportsNamespaces catalog, Namespace namespace) { + Map properties = catalog.loadNamespaceMetadata(namespace); + return GetNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(properties) + .build(); + } + + public static void dropNamespace(SupportsNamespaces catalog, Namespace namespace) { + boolean dropped = catalog.dropNamespace(namespace); + if (!dropped) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } + + public static UpdateNamespacePropertiesResponse updateNamespaceProperties( + SupportsNamespaces catalog, Namespace namespace, UpdateNamespacePropertiesRequest request) { + request.validate(); + + Set removals = Sets.newHashSet(request.removals()); + Map updates = request.updates(); + + Map startProperties = catalog.loadNamespaceMetadata(namespace); + Set missing = Sets.difference(removals, startProperties.keySet()); + + if (!updates.isEmpty()) { + catalog.setProperties(namespace, updates); + } + + if (!removals.isEmpty()) { + // remove the original set just in case there was an update just after loading properties + catalog.removeProperties(namespace, removals); + } + + return UpdateNamespacePropertiesResponse.builder() + .addMissing(missing) + .addUpdated(updates.keySet()) + .addRemoved(Sets.difference(removals, missing)) + .build(); + } + + public static ListTablesResponse listTables(Catalog catalog, Namespace namespace) { + List idents = catalog.listTables(namespace); + return ListTablesResponse.builder().addAll(idents).build(); + } + + public static ListTablesResponse listTables( + Catalog catalog, Namespace namespace, String pageToken, String pageSize) { + List results = catalog.listTables(namespace); + + Pair, String> page = + paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); + } + + public static LoadTableResponse stageTableCreate( + Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); + + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + if (catalog.tableExists(ident)) { + throw new AlreadyExistsException("Table already exists: %s", ident); + } + + Map properties = Maps.newHashMap(); + properties.put("created-at", OffsetDateTime.now(ZoneOffset.UTC).toString()); + properties.putAll(request.properties()); + + String location; + if (request.location() != null) { + location = request.location(); + } else { + location = + catalog + .buildTable(ident, request.schema()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(properties) + .createTransaction() + .table() + .location(); + } + + TableMetadata metadata = + TableMetadata.newTableMetadata( + request.schema(), + request.spec() != null ? request.spec() : PartitionSpec.unpartitioned(), + request.writeOrder() != null ? request.writeOrder() : SortOrder.unsorted(), + location, + properties); + + return LoadTableResponse.builder().withTableMetadata(metadata).build(); + } + + public static LoadTableResponse createTable( + Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); + + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + Table table = + catalog + .buildTable(ident, request.schema()) + .withLocation(request.location()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(request.properties()) + .create(); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static LoadTableResponse registerTable( + Catalog catalog, Namespace namespace, RegisterTableRequest request) { + request.validate(); + + TableIdentifier identifier = TableIdentifier.of(namespace, request.name()); + Table table = catalog.registerTable(identifier, request.metadataLocation()); + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static void dropTable(Catalog catalog, TableIdentifier ident) { + boolean dropped = catalog.dropTable(ident, false); + if (!dropped) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public static void purgeTable(Catalog catalog, TableIdentifier ident) { + boolean dropped = catalog.dropTable(ident, true); + if (!dropped) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public static void tableExists(Catalog catalog, TableIdentifier ident) { + boolean exists = catalog.tableExists(ident); + if (!exists) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) { + Table table = catalog.loadTable(ident); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } else if (table instanceof BaseMetadataTable) { + // metadata tables are loaded on the client side, return NoSuchTableException for now + throw new NoSuchTableException("Table does not exist: %s", ident.toString()); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static LoadTableResponse updateTable( + Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { + TableMetadata finalMetadata; + if (isCreate(request)) { + // this is a hacky way to get TableOperations for an uncommitted table + Transaction transaction = + catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction(); + if (transaction instanceof BaseTransaction) { + BaseTransaction baseTransaction = (BaseTransaction) transaction; + finalMetadata = create(baseTransaction.underlyingOps(), request); + } else { + throw new IllegalStateException( + "Cannot wrap catalog that does not produce BaseTransaction"); + } + + } else { + Table table = catalog.loadTable(ident); + if (table instanceof BaseTable) { + TableOperations ops = ((BaseTable) table).operations(); + finalMetadata = commit(ops, request); + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + } + + return LoadTableResponse.builder().withTableMetadata(finalMetadata).build(); + } + + public static void renameTable(Catalog catalog, RenameTableRequest request) { + catalog.renameTable(request.source(), request.destination()); + } + + private static boolean isCreate(UpdateTableRequest request) { + boolean isCreate = + request.requirements().stream() + .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); + + if (isCreate) { + List invalidRequirements = + request.requirements().stream() + .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist)) + .collect(Collectors.toList()); + Preconditions.checkArgument( + invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements); + } + + return isCreate; + } + + private static TableMetadata create(TableOperations ops, UpdateTableRequest request) { + // the only valid requirement is that the table will be created + request.requirements().forEach(requirement -> requirement.validate(ops.current())); + Optional formatVersion = + request.updates().stream() + .filter(update -> update instanceof UpgradeFormatVersion) + .map(update -> ((UpgradeFormatVersion) update).formatVersion()) + .findFirst(); + + TableMetadata.Builder builder = + formatVersion.map(TableMetadata::buildFromEmpty).orElseGet(TableMetadata::buildFromEmpty); + request.updates().forEach(update -> update.applyTo(builder)); + // create transactions do not retry. if the table exists, retrying is not a solution + ops.commit(null, builder.build()); + + return ops.current(); + } + + static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + try { + Tasks.foreach(ops) + .retry(COMMIT_NUM_RETRIES_DEFAULT) + .exponentialBackoff( + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + + // validate requirements + try { + request.requirements().forEach(requirement -> requirement.validate(base)); + } catch (CommitFailedException e) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // apply changes + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + + TableMetadata updated = metadataBuilder.build(); + if (updated.changes().isEmpty()) { + // do not commit if the metadata has not changed + return; + } + + // commit + taskOps.commit(base, updated); + }); + + } catch (ValidationFailureException e) { + throw e.wrapped(); + } + + return ops.current(); + } + + private static BaseView asBaseView(View view) { + Preconditions.checkState( + view instanceof BaseView, "Cannot wrap catalog that does not produce BaseView"); + return (BaseView) view; + } + + public static ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) { + return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build(); + } + + public static ListTablesResponse listViews( + ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) { + List results = catalog.listViews(namespace); + + Pair, String> page = + paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); + } + + public static LoadViewResponse createView( + ViewCatalog catalog, Namespace namespace, CreateViewRequest request) { + request.validate(); + + ViewBuilder viewBuilder = + catalog + .buildView(TableIdentifier.of(namespace, request.name())) + .withSchema(request.schema()) + .withProperties(request.properties()) + .withDefaultNamespace(request.viewVersion().defaultNamespace()) + .withDefaultCatalog(request.viewVersion().defaultCatalog()) + .withLocation(request.location()); + + Set unsupportedRepresentations = + request.viewVersion().representations().stream() + .filter(r -> !(r instanceof SQLViewRepresentation)) + .map(ViewRepresentation::type) + .collect(Collectors.toSet()); + + if (!unsupportedRepresentations.isEmpty()) { + throw new IllegalStateException( + String.format("Found unsupported view representations: %s", unsupportedRepresentations)); + } + + request.viewVersion().representations().stream() + .filter(SQLViewRepresentation.class::isInstance) + .map(SQLViewRepresentation.class::cast) + .forEach(r -> viewBuilder.withQuery(r.dialect(), r.sql())); + + View view = viewBuilder.create(); + + return viewResponse(view); + } + + private static LoadViewResponse viewResponse(View view) { + ViewMetadata metadata = asBaseView(view).operations().current(); + return ImmutableLoadViewResponse.builder() + .metadata(metadata) + .metadataLocation(metadata.metadataFileLocation()) + .build(); + } + + public static void viewExists(ViewCatalog catalog, TableIdentifier viewIdentifier) { + if (!catalog.viewExists(viewIdentifier)) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } + } + + public static LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + View view = catalog.loadView(viewIdentifier); + return viewResponse(view); + } + + public static LoadViewResponse updateView( + ViewCatalog catalog, TableIdentifier ident, UpdateTableRequest request) { + View view = catalog.loadView(ident); + ViewMetadata metadata = commit(asBaseView(view).operations(), request); + + return ImmutableLoadViewResponse.builder() + .metadata(metadata) + .metadataLocation(metadata.metadataFileLocation()) + .build(); + } + + public static void renameView(ViewCatalog catalog, RenameTableRequest request) { + catalog.renameView(request.source(), request.destination()); + } + + public static void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + boolean dropped = catalog.dropView(viewIdentifier); + if (!dropped) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } + } + + static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + try { + Tasks.foreach(ops) + .retry(COMMIT_NUM_RETRIES_DEFAULT) + .exponentialBackoff( + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + ViewMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + + // validate requirements + try { + request.requirements().forEach(requirement -> requirement.validate(base)); + } catch (CommitFailedException e) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // apply changes + ViewMetadata.Builder metadataBuilder = ViewMetadata.buildFrom(base); + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + + ViewMetadata updated = metadataBuilder.build(); + + if (updated.changes().isEmpty()) { + // do not commit if the metadata has not changed + return; + } + + // commit + taskOps.commit(base, updated); + }); + + } catch (ValidationFailureException e) { + throw e.wrapped(); + } + + return ops.current(); + } +} \ No newline at end of file From b1fa920b554220cc5988f266c56ddf80c606e81b Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Mon, 12 May 2025 23:38:36 -0700 Subject: [PATCH 2/9] wire it up --- .../quarkus/admin/PolarisAuthzTestBase.java | 2 + .../IcebergCatalogHandlerAuthzTest.java | 9 +- .../catalog/iceberg/CatalogHandlerUtils.java | 988 +++++++++--------- .../iceberg/IcebergCatalogAdapter.java | 14 +- .../iceberg/IcebergCatalogHandler.java | 5 +- .../apache/polaris/service/TestServices.java | 6 +- 6 files changed, 523 insertions(+), 501 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index 91f0a33d61..f4806df090 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -81,6 +81,7 @@ import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.catalog.generic.GenericTableCatalog; +import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.policy.PolicyCatalog; @@ -192,6 +193,7 @@ public Map getConfigOverrides() { @Inject protected Clock clock; @Inject protected FileIOFactory fileIOFactory; @Inject protected PolarisEventListener polarisEventListener; + @Inject protected CatalogHandlerUtils catalogHandlerUtils; protected IcebergCatalog baseCatalog; protected GenericTableCatalog genericTableCatalog; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java index 08dd790e90..31bdd6c010 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java @@ -116,7 +116,8 @@ private IcebergCatalogHandler newWrapper( factory, catalogName, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); } /** @@ -256,7 +257,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { callContextCatalogFactory, CATALOG_NAME, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); // a variety of actions are all disallowed because the principal's credentials must be rotated doTestInsufficientPrivileges( @@ -290,7 +292,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { callContextCatalogFactory, CATALOG_NAME, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); doTestSufficientPrivilegeSets( List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)), diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java index 8784d09e91..4e08e1b60b 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -23,6 +23,11 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Collections; @@ -54,9 +59,6 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.CreateViewRequest; @@ -81,519 +83,525 @@ import org.apache.iceberg.view.ViewMetadata; import org.apache.iceberg.view.ViewOperations; import org.apache.iceberg.view.ViewRepresentation; +import org.apache.polaris.core.config.PolarisConfigurationStore; /** - * CODE_COPIED_TO_POLARIS - * Copied from CatalogHandler in Iceberg 1.8.0 - * Contains a collection of utilities related to managing Iceberg entities + * CODE_COPIED_TO_POLARIS Copied from CatalogHandler in Iceberg 1.8.0 Contains a collection of + * utilities related to managing Iceberg entities */ +@ApplicationScoped public class CatalogHandlerUtils { - private static final Schema EMPTY_SCHEMA = new Schema(); - private static final String INITIAL_PAGE_TOKEN = ""; - - private CatalogHandlerUtils() {} - - /** - * Exception used to avoid retrying commits when assertions fail. - * - *

When a REST assertion fails, it will throw CommitFailedException to send back to the client. - * But the assertion checks happen in the block that is retried if {@link - * TableOperations#commit(TableMetadata, TableMetadata)} throws CommitFailedException. This is - * used to avoid retries for assertion failures, which are unwrapped and rethrown outside of the - * commit loop. - */ - private static class ValidationFailureException extends RuntimeException { - private final CommitFailedException wrapped; - - private ValidationFailureException(CommitFailedException cause) { - super(cause); - this.wrapped = cause; - } - - public CommitFailedException wrapped() { - return wrapped; - } - } - - private static Pair, String> paginate(List list, String pageToken, int pageSize) { - int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken); - if (pageStart >= list.size()) { - return Pair.of(Collections.emptyList(), null); - } - - int end = Math.min(pageStart + pageSize, list.size()); - List subList = list.subList(pageStart, end); - String nextPageToken = end >= list.size() ? null : String.valueOf(end); + private static final Schema EMPTY_SCHEMA = new Schema(); + private static final String INITIAL_PAGE_TOKEN = ""; + + private final PolarisConfigurationStore configurationStore; + + @Inject + public CatalogHandlerUtils(PolarisConfigurationStore configurationStore) { + this.configurationStore = configurationStore; + } + + /** + * Exception used to avoid retrying commits when assertions fail. + * + *

When a REST assertion fails, it will throw CommitFailedException to send back to the client. + * But the assertion checks happen in the block that is retried if {@link + * TableOperations#commit(TableMetadata, TableMetadata)} throws CommitFailedException. This is + * used to avoid retries for assertion failures, which are unwrapped and rethrown outside of the + * commit loop. + */ + private static class ValidationFailureException extends RuntimeException { + private final CommitFailedException wrapped; - return Pair.of(subList, nextPageToken); + private ValidationFailureException(CommitFailedException cause) { + super(cause); + this.wrapped = cause; } - public static ListNamespacesResponse listNamespaces( - SupportsNamespaces catalog, Namespace parent) { - List results; - if (parent.isEmpty()) { - results = catalog.listNamespaces(); - } else { - results = catalog.listNamespaces(parent); - } - - return ListNamespacesResponse.builder().addAll(results).build(); - } - - public static ListNamespacesResponse listNamespaces( - SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) { - List results; - - if (parent.isEmpty()) { - results = catalog.listNamespaces(); - } else { - results = catalog.listNamespaces(parent); - } - - Pair, String> page = paginate(results, pageToken, Integer.parseInt(pageSize)); - - return ListNamespacesResponse.builder() - .addAll(page.first()) - .nextPageToken(page.second()) - .build(); + public CommitFailedException wrapped() { + return wrapped; } + } - public static CreateNamespaceResponse createNamespace( - SupportsNamespaces catalog, CreateNamespaceRequest request) { - Namespace namespace = request.namespace(); - catalog.createNamespace(namespace, request.properties()); - return CreateNamespaceResponse.builder() - .withNamespace(namespace) - .setProperties(catalog.loadNamespaceMetadata(namespace)) - .build(); + private static Pair, String> paginate(List list, String pageToken, int pageSize) { + int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken); + if (pageStart >= list.size()) { + return Pair.of(Collections.emptyList(), null); } - public static void namespaceExists(SupportsNamespaces catalog, Namespace namespace) { - if (!catalog.namespaceExists(namespace)) { - throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); - } - } + int end = Math.min(pageStart + pageSize, list.size()); + List subList = list.subList(pageStart, end); + String nextPageToken = end >= list.size() ? null : String.valueOf(end); - public static GetNamespaceResponse loadNamespace( - SupportsNamespaces catalog, Namespace namespace) { - Map properties = catalog.loadNamespaceMetadata(namespace); - return GetNamespaceResponse.builder() - .withNamespace(namespace) - .setProperties(properties) - .build(); - } + return Pair.of(subList, nextPageToken); + } - public static void dropNamespace(SupportsNamespaces catalog, Namespace namespace) { - boolean dropped = catalog.dropNamespace(namespace); - if (!dropped) { - throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); - } + public static ListNamespacesResponse listNamespaces( + SupportsNamespaces catalog, Namespace parent) { + List results; + if (parent.isEmpty()) { + results = catalog.listNamespaces(); + } else { + results = catalog.listNamespaces(parent); } - public static UpdateNamespacePropertiesResponse updateNamespaceProperties( - SupportsNamespaces catalog, Namespace namespace, UpdateNamespacePropertiesRequest request) { - request.validate(); - - Set removals = Sets.newHashSet(request.removals()); - Map updates = request.updates(); - - Map startProperties = catalog.loadNamespaceMetadata(namespace); - Set missing = Sets.difference(removals, startProperties.keySet()); - - if (!updates.isEmpty()) { - catalog.setProperties(namespace, updates); - } + return ListNamespacesResponse.builder().addAll(results).build(); + } - if (!removals.isEmpty()) { - // remove the original set just in case there was an update just after loading properties - catalog.removeProperties(namespace, removals); - } + public static ListNamespacesResponse listNamespaces( + SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) { + List results; - return UpdateNamespacePropertiesResponse.builder() - .addMissing(missing) - .addUpdated(updates.keySet()) - .addRemoved(Sets.difference(removals, missing)) - .build(); + if (parent.isEmpty()) { + results = catalog.listNamespaces(); + } else { + results = catalog.listNamespaces(parent); } - public static ListTablesResponse listTables(Catalog catalog, Namespace namespace) { - List idents = catalog.listTables(namespace); - return ListTablesResponse.builder().addAll(idents).build(); - } + Pair, String> page = paginate(results, pageToken, Integer.parseInt(pageSize)); - public static ListTablesResponse listTables( - Catalog catalog, Namespace namespace, String pageToken, String pageSize) { - List results = catalog.listTables(namespace); + return ListNamespacesResponse.builder() + .addAll(page.first()) + .nextPageToken(page.second()) + .build(); + } - Pair, String> page = - paginate(results, pageToken, Integer.parseInt(pageSize)); + public static CreateNamespaceResponse createNamespace( + SupportsNamespaces catalog, CreateNamespaceRequest request) { + Namespace namespace = request.namespace(); + catalog.createNamespace(namespace, request.properties()); + return CreateNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(catalog.loadNamespaceMetadata(namespace)) + .build(); + } + + public static void namespaceExists(SupportsNamespaces catalog, Namespace namespace) { + if (!catalog.namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } - return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); - } + public static GetNamespaceResponse loadNamespace( + SupportsNamespaces catalog, Namespace namespace) { + Map properties = catalog.loadNamespaceMetadata(namespace); + return GetNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(properties) + .build(); + } + + public static void dropNamespace(SupportsNamespaces catalog, Namespace namespace) { + boolean dropped = catalog.dropNamespace(namespace); + if (!dropped) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } + + public static UpdateNamespacePropertiesResponse updateNamespaceProperties( + SupportsNamespaces catalog, Namespace namespace, UpdateNamespacePropertiesRequest request) { + request.validate(); + + Set removals = Sets.newHashSet(request.removals()); + Map updates = request.updates(); + + Map startProperties = catalog.loadNamespaceMetadata(namespace); + Set missing = Sets.difference(removals, startProperties.keySet()); + + if (!updates.isEmpty()) { + catalog.setProperties(namespace, updates); + } - public static LoadTableResponse stageTableCreate( - Catalog catalog, Namespace namespace, CreateTableRequest request) { - request.validate(); - - TableIdentifier ident = TableIdentifier.of(namespace, request.name()); - if (catalog.tableExists(ident)) { - throw new AlreadyExistsException("Table already exists: %s", ident); - } - - Map properties = Maps.newHashMap(); - properties.put("created-at", OffsetDateTime.now(ZoneOffset.UTC).toString()); - properties.putAll(request.properties()); - - String location; - if (request.location() != null) { - location = request.location(); - } else { - location = - catalog - .buildTable(ident, request.schema()) - .withPartitionSpec(request.spec()) - .withSortOrder(request.writeOrder()) - .withProperties(properties) - .createTransaction() - .table() - .location(); - } - - TableMetadata metadata = - TableMetadata.newTableMetadata( - request.schema(), - request.spec() != null ? request.spec() : PartitionSpec.unpartitioned(), - request.writeOrder() != null ? request.writeOrder() : SortOrder.unsorted(), - location, - properties); - - return LoadTableResponse.builder().withTableMetadata(metadata).build(); + if (!removals.isEmpty()) { + // remove the original set just in case there was an update just after loading properties + catalog.removeProperties(namespace, removals); } - public static LoadTableResponse createTable( - Catalog catalog, Namespace namespace, CreateTableRequest request) { - request.validate(); - - TableIdentifier ident = TableIdentifier.of(namespace, request.name()); - Table table = - catalog - .buildTable(ident, request.schema()) - .withLocation(request.location()) - .withPartitionSpec(request.spec()) - .withSortOrder(request.writeOrder()) - .withProperties(request.properties()) - .create(); - - if (table instanceof BaseTable) { - return LoadTableResponse.builder() - .withTableMetadata(((BaseTable) table).operations().current()) - .build(); - } + return UpdateNamespacePropertiesResponse.builder() + .addMissing(missing) + .addUpdated(updates.keySet()) + .addRemoved(Sets.difference(removals, missing)) + .build(); + } + + public static ListTablesResponse listTables(Catalog catalog, Namespace namespace) { + List idents = catalog.listTables(namespace); + return ListTablesResponse.builder().addAll(idents).build(); + } + + public static ListTablesResponse listTables( + Catalog catalog, Namespace namespace, String pageToken, String pageSize) { + List results = catalog.listTables(namespace); + + Pair, String> page = + paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); + } + + public static LoadTableResponse stageTableCreate( + Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); - } - - public static LoadTableResponse registerTable( - Catalog catalog, Namespace namespace, RegisterTableRequest request) { - request.validate(); - - TableIdentifier identifier = TableIdentifier.of(namespace, request.name()); - Table table = catalog.registerTable(identifier, request.metadataLocation()); - if (table instanceof BaseTable) { - return LoadTableResponse.builder() - .withTableMetadata(((BaseTable) table).operations().current()) - .build(); - } + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + if (catalog.tableExists(ident)) { + throw new AlreadyExistsException("Table already exists: %s", ident); + } + + Map properties = Maps.newHashMap(); + properties.put("created-at", OffsetDateTime.now(ZoneOffset.UTC).toString()); + properties.putAll(request.properties()); + String location; + if (request.location() != null) { + location = request.location(); + } else { + location = + catalog + .buildTable(ident, request.schema()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(properties) + .createTransaction() + .table() + .location(); + } + + TableMetadata metadata = + TableMetadata.newTableMetadata( + request.schema(), + request.spec() != null ? request.spec() : PartitionSpec.unpartitioned(), + request.writeOrder() != null ? request.writeOrder() : SortOrder.unsorted(), + location, + properties); + + return LoadTableResponse.builder().withTableMetadata(metadata).build(); + } + + public static LoadTableResponse createTable( + Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); + + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + Table table = + catalog + .buildTable(ident, request.schema()) + .withLocation(request.location()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(request.properties()) + .create(); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static LoadTableResponse registerTable( + Catalog catalog, Namespace namespace, RegisterTableRequest request) { + request.validate(); + + TableIdentifier identifier = TableIdentifier.of(namespace, request.name()); + Table table = catalog.registerTable(identifier, request.metadataLocation()); + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static void dropTable(Catalog catalog, TableIdentifier ident) { + boolean dropped = catalog.dropTable(ident, false); + if (!dropped) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public static void purgeTable(Catalog catalog, TableIdentifier ident) { + boolean dropped = catalog.dropTable(ident, true); + if (!dropped) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public static void tableExists(Catalog catalog, TableIdentifier ident) { + boolean exists = catalog.tableExists(ident); + if (!exists) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) { + Table table = catalog.loadTable(ident); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } else if (table instanceof BaseMetadataTable) { + // metadata tables are loaded on the client side, return NoSuchTableException for now + throw new NoSuchTableException("Table does not exist: %s", ident.toString()); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static LoadTableResponse updateTable( + Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { + TableMetadata finalMetadata; + if (isCreate(request)) { + // this is a hacky way to get TableOperations for an uncommitted table + Transaction transaction = + catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction(); + if (transaction instanceof BaseTransaction) { + BaseTransaction baseTransaction = (BaseTransaction) transaction; + finalMetadata = create(baseTransaction.underlyingOps(), request); + } else { + throw new IllegalStateException( + "Cannot wrap catalog that does not produce BaseTransaction"); + } + + } else { + Table table = catalog.loadTable(ident); + if (table instanceof BaseTable) { + TableOperations ops = ((BaseTable) table).operations(); + finalMetadata = commit(ops, request); + } else { throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); - } - - public static void dropTable(Catalog catalog, TableIdentifier ident) { - boolean dropped = catalog.dropTable(ident, false); - if (!dropped) { - throw new NoSuchTableException("Table does not exist: %s", ident); - } - } - - public static void purgeTable(Catalog catalog, TableIdentifier ident) { - boolean dropped = catalog.dropTable(ident, true); - if (!dropped) { - throw new NoSuchTableException("Table does not exist: %s", ident); - } - } - - public static void tableExists(Catalog catalog, TableIdentifier ident) { - boolean exists = catalog.tableExists(ident); - if (!exists) { - throw new NoSuchTableException("Table does not exist: %s", ident); - } - } - - public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) { - Table table = catalog.loadTable(ident); - - if (table instanceof BaseTable) { - return LoadTableResponse.builder() - .withTableMetadata(((BaseTable) table).operations().current()) - .build(); - } else if (table instanceof BaseMetadataTable) { - // metadata tables are loaded on the client side, return NoSuchTableException for now - throw new NoSuchTableException("Table does not exist: %s", ident.toString()); - } - - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); - } - - public static LoadTableResponse updateTable( - Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { - TableMetadata finalMetadata; - if (isCreate(request)) { - // this is a hacky way to get TableOperations for an uncommitted table - Transaction transaction = - catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction(); - if (transaction instanceof BaseTransaction) { - BaseTransaction baseTransaction = (BaseTransaction) transaction; - finalMetadata = create(baseTransaction.underlyingOps(), request); - } else { - throw new IllegalStateException( - "Cannot wrap catalog that does not produce BaseTransaction"); - } - - } else { - Table table = catalog.loadTable(ident); - if (table instanceof BaseTable) { - TableOperations ops = ((BaseTable) table).operations(); - finalMetadata = commit(ops, request); - } else { - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); - } - } - - return LoadTableResponse.builder().withTableMetadata(finalMetadata).build(); - } - - public static void renameTable(Catalog catalog, RenameTableRequest request) { - catalog.renameTable(request.source(), request.destination()); - } - - private static boolean isCreate(UpdateTableRequest request) { - boolean isCreate = - request.requirements().stream() - .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); - - if (isCreate) { - List invalidRequirements = - request.requirements().stream() - .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist)) - .collect(Collectors.toList()); - Preconditions.checkArgument( - invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements); - } - - return isCreate; - } - - private static TableMetadata create(TableOperations ops, UpdateTableRequest request) { - // the only valid requirement is that the table will be created - request.requirements().forEach(requirement -> requirement.validate(ops.current())); - Optional formatVersion = - request.updates().stream() - .filter(update -> update instanceof UpgradeFormatVersion) - .map(update -> ((UpgradeFormatVersion) update).formatVersion()) - .findFirst(); - - TableMetadata.Builder builder = - formatVersion.map(TableMetadata::buildFromEmpty).orElseGet(TableMetadata::buildFromEmpty); - request.updates().forEach(update -> update.applyTo(builder)); - // create transactions do not retry. if the table exists, retrying is not a solution - ops.commit(null, builder.build()); - - return ops.current(); - } - - static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { - AtomicBoolean isRetry = new AtomicBoolean(false); - try { - Tasks.foreach(ops) - .retry(COMMIT_NUM_RETRIES_DEFAULT) - .exponentialBackoff( - COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, - 2.0 /* exponential */) - .onlyRetryOn(CommitFailedException.class) - .run( - taskOps -> { - TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); - isRetry.set(true); - - // validate requirements - try { - request.requirements().forEach(requirement -> requirement.validate(base)); - } catch (CommitFailedException e) { - // wrap and rethrow outside of tasks to avoid unnecessary retry - throw new ValidationFailureException(e); - } - - // apply changes - TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); - request.updates().forEach(update -> update.applyTo(metadataBuilder)); - - TableMetadata updated = metadataBuilder.build(); - if (updated.changes().isEmpty()) { - // do not commit if the metadata has not changed - return; - } - - // commit - taskOps.commit(base, updated); - }); - - } catch (ValidationFailureException e) { - throw e.wrapped(); - } - - return ops.current(); - } - - private static BaseView asBaseView(View view) { - Preconditions.checkState( - view instanceof BaseView, "Cannot wrap catalog that does not produce BaseView"); - return (BaseView) view; - } - - public static ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) { - return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build(); - } - - public static ListTablesResponse listViews( - ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) { - List results = catalog.listViews(namespace); - - Pair, String> page = - paginate(results, pageToken, Integer.parseInt(pageSize)); - - return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); - } - - public static LoadViewResponse createView( - ViewCatalog catalog, Namespace namespace, CreateViewRequest request) { - request.validate(); - - ViewBuilder viewBuilder = - catalog - .buildView(TableIdentifier.of(namespace, request.name())) - .withSchema(request.schema()) - .withProperties(request.properties()) - .withDefaultNamespace(request.viewVersion().defaultNamespace()) - .withDefaultCatalog(request.viewVersion().defaultCatalog()) - .withLocation(request.location()); - - Set unsupportedRepresentations = - request.viewVersion().representations().stream() - .filter(r -> !(r instanceof SQLViewRepresentation)) - .map(ViewRepresentation::type) - .collect(Collectors.toSet()); - - if (!unsupportedRepresentations.isEmpty()) { - throw new IllegalStateException( - String.format("Found unsupported view representations: %s", unsupportedRepresentations)); - } - + } + } + + return LoadTableResponse.builder().withTableMetadata(finalMetadata).build(); + } + + public static void renameTable(Catalog catalog, RenameTableRequest request) { + catalog.renameTable(request.source(), request.destination()); + } + + private static boolean isCreate(UpdateTableRequest request) { + boolean isCreate = + request.requirements().stream() + .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); + + if (isCreate) { + List invalidRequirements = + request.requirements().stream() + .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist)) + .collect(Collectors.toList()); + Preconditions.checkArgument( + invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements); + } + + return isCreate; + } + + private static TableMetadata create(TableOperations ops, UpdateTableRequest request) { + // the only valid requirement is that the table will be created + request.requirements().forEach(requirement -> requirement.validate(ops.current())); + Optional formatVersion = + request.updates().stream() + .filter(update -> update instanceof UpgradeFormatVersion) + .map(update -> ((UpgradeFormatVersion) update).formatVersion()) + .findFirst(); + + TableMetadata.Builder builder = + formatVersion.map(TableMetadata::buildFromEmpty).orElseGet(TableMetadata::buildFromEmpty); + request.updates().forEach(update -> update.applyTo(builder)); + // create transactions do not retry. if the table exists, retrying is not a solution + ops.commit(null, builder.build()); + + return ops.current(); + } + + static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + try { + Tasks.foreach(ops) + .retry(COMMIT_NUM_RETRIES_DEFAULT) + .exponentialBackoff( + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + + // validate requirements + try { + request.requirements().forEach(requirement -> requirement.validate(base)); + } catch (CommitFailedException e) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // apply changes + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + + TableMetadata updated = metadataBuilder.build(); + if (updated.changes().isEmpty()) { + // do not commit if the metadata has not changed + return; + } + + // commit + taskOps.commit(base, updated); + }); + + } catch (ValidationFailureException e) { + throw e.wrapped(); + } + + return ops.current(); + } + + private static BaseView asBaseView(View view) { + Preconditions.checkState( + view instanceof BaseView, "Cannot wrap catalog that does not produce BaseView"); + return (BaseView) view; + } + + public static ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) { + return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build(); + } + + public static ListTablesResponse listViews( + ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) { + List results = catalog.listViews(namespace); + + Pair, String> page = + paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); + } + + public static LoadViewResponse createView( + ViewCatalog catalog, Namespace namespace, CreateViewRequest request) { + request.validate(); + + ViewBuilder viewBuilder = + catalog + .buildView(TableIdentifier.of(namespace, request.name())) + .withSchema(request.schema()) + .withProperties(request.properties()) + .withDefaultNamespace(request.viewVersion().defaultNamespace()) + .withDefaultCatalog(request.viewVersion().defaultCatalog()) + .withLocation(request.location()); + + Set unsupportedRepresentations = request.viewVersion().representations().stream() - .filter(SQLViewRepresentation.class::isInstance) - .map(SQLViewRepresentation.class::cast) - .forEach(r -> viewBuilder.withQuery(r.dialect(), r.sql())); - - View view = viewBuilder.create(); - - return viewResponse(view); - } - - private static LoadViewResponse viewResponse(View view) { - ViewMetadata metadata = asBaseView(view).operations().current(); - return ImmutableLoadViewResponse.builder() - .metadata(metadata) - .metadataLocation(metadata.metadataFileLocation()) - .build(); - } - - public static void viewExists(ViewCatalog catalog, TableIdentifier viewIdentifier) { - if (!catalog.viewExists(viewIdentifier)) { - throw new NoSuchViewException("View does not exist: %s", viewIdentifier); - } - } - - public static LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier viewIdentifier) { - View view = catalog.loadView(viewIdentifier); - return viewResponse(view); - } - - public static LoadViewResponse updateView( - ViewCatalog catalog, TableIdentifier ident, UpdateTableRequest request) { - View view = catalog.loadView(ident); - ViewMetadata metadata = commit(asBaseView(view).operations(), request); - - return ImmutableLoadViewResponse.builder() - .metadata(metadata) - .metadataLocation(metadata.metadataFileLocation()) - .build(); - } - - public static void renameView(ViewCatalog catalog, RenameTableRequest request) { - catalog.renameView(request.source(), request.destination()); - } - - public static void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) { - boolean dropped = catalog.dropView(viewIdentifier); - if (!dropped) { - throw new NoSuchViewException("View does not exist: %s", viewIdentifier); - } - } - - static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { - AtomicBoolean isRetry = new AtomicBoolean(false); - try { - Tasks.foreach(ops) - .retry(COMMIT_NUM_RETRIES_DEFAULT) - .exponentialBackoff( - COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, - 2.0 /* exponential */) - .onlyRetryOn(CommitFailedException.class) - .run( - taskOps -> { - ViewMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); - isRetry.set(true); - - // validate requirements - try { - request.requirements().forEach(requirement -> requirement.validate(base)); - } catch (CommitFailedException e) { - // wrap and rethrow outside of tasks to avoid unnecessary retry - throw new ValidationFailureException(e); - } - - // apply changes - ViewMetadata.Builder metadataBuilder = ViewMetadata.buildFrom(base); - request.updates().forEach(update -> update.applyTo(metadataBuilder)); - - ViewMetadata updated = metadataBuilder.build(); - - if (updated.changes().isEmpty()) { - // do not commit if the metadata has not changed - return; - } - - // commit - taskOps.commit(base, updated); - }); - - } catch (ValidationFailureException e) { - throw e.wrapped(); - } - - return ops.current(); - } -} \ No newline at end of file + .filter(r -> !(r instanceof SQLViewRepresentation)) + .map(ViewRepresentation::type) + .collect(Collectors.toSet()); + + if (!unsupportedRepresentations.isEmpty()) { + throw new IllegalStateException( + String.format("Found unsupported view representations: %s", unsupportedRepresentations)); + } + + request.viewVersion().representations().stream() + .filter(SQLViewRepresentation.class::isInstance) + .map(SQLViewRepresentation.class::cast) + .forEach(r -> viewBuilder.withQuery(r.dialect(), r.sql())); + + View view = viewBuilder.create(); + + return viewResponse(view); + } + + private static LoadViewResponse viewResponse(View view) { + ViewMetadata metadata = asBaseView(view).operations().current(); + return ImmutableLoadViewResponse.builder() + .metadata(metadata) + .metadataLocation(metadata.metadataFileLocation()) + .build(); + } + + public static void viewExists(ViewCatalog catalog, TableIdentifier viewIdentifier) { + if (!catalog.viewExists(viewIdentifier)) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } + } + + public static LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + View view = catalog.loadView(viewIdentifier); + return viewResponse(view); + } + + public static LoadViewResponse updateView( + ViewCatalog catalog, TableIdentifier ident, UpdateTableRequest request) { + View view = catalog.loadView(ident); + ViewMetadata metadata = commit(asBaseView(view).operations(), request); + + return ImmutableLoadViewResponse.builder() + .metadata(metadata) + .metadataLocation(metadata.metadataFileLocation()) + .build(); + } + + public static void renameView(ViewCatalog catalog, RenameTableRequest request) { + catalog.renameView(request.source(), request.destination()); + } + + public static void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + boolean dropped = catalog.dropView(viewIdentifier); + if (!dropped) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } + } + + static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + try { + Tasks.foreach(ops) + .retry(COMMIT_NUM_RETRIES_DEFAULT) + .exponentialBackoff( + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + ViewMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + + // validate requirements + try { + request.requirements().forEach(requirement -> requirement.validate(base)); + } catch (CommitFailedException e) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // apply changes + ViewMetadata.Builder metadataBuilder = ViewMetadata.buildFrom(base); + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + + ViewMetadata updated = metadataBuilder.build(); + + if (updated.changes().isEmpty()) { + // do not commit if the metadata has not changed + return; + } + + // commit + taskOps.commit(base, updated); + }); + + } catch (ValidationFailureException e) { + throw e.wrapped(); + } + + return ops.current(); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 4484dc1291..4c9c527d6f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -35,7 +35,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; -import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.BadRequestException; @@ -85,9 +84,8 @@ import org.slf4j.LoggerFactory; /** - * {@link IcebergRestCatalogApiService} implementation that delegates operations to {@link - * org.apache.iceberg.rest.CatalogHandlers} after finding the appropriate {@link Catalog} for the - * current {@link RealmContext}. + * An adapter between generated service types like `IcebergRestCatalogApiService` and + * `IcebergCatalogHandler`. */ @RequestScoped public class IcebergCatalogAdapter @@ -140,6 +138,7 @@ public class IcebergCatalogAdapter private final PolarisAuthorizer polarisAuthorizer; private final CatalogPrefixParser prefixParser; private final ReservedProperties reservedProperties; + private final CatalogHandlerUtils catalogHandlerUtils; @Inject public IcebergCatalogAdapter( @@ -151,7 +150,8 @@ public IcebergCatalogAdapter( UserSecretsManager userSecretsManager, PolarisAuthorizer polarisAuthorizer, CatalogPrefixParser prefixParser, - ReservedProperties reservedProperties) { + ReservedProperties reservedProperties, + CatalogHandlerUtils catalogHandlerUtils) { this.realmContext = realmContext; this.callContext = callContext; this.catalogFactory = catalogFactory; @@ -161,6 +161,7 @@ public IcebergCatalogAdapter( this.polarisAuthorizer = polarisAuthorizer; this.prefixParser = prefixParser; this.reservedProperties = reservedProperties; + this.catalogHandlerUtils = catalogHandlerUtils; // FIXME: This is a hack to set the current context for downstream calls. CallContext.setCurrentContext(callContext); @@ -199,7 +200,8 @@ private IcebergCatalogHandler newHandlerWrapper( catalogFactory, catalogName, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); } @Override diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index aca12eb6fb..5fa6c0a248 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -128,6 +128,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab private final UserSecretsManager userSecretsManager; private final CallContextCatalogFactory catalogFactory; private final ReservedProperties reservedProperties; + private final CatalogHandlerUtils catalogHandlerUtils; // Catalog instance will be initialized after authorizing resolver successfully resolves // the catalog entity. @@ -147,12 +148,14 @@ public IcebergCatalogHandler( CallContextCatalogFactory catalogFactory, String catalogName, PolarisAuthorizer authorizer, - ReservedProperties reservedProperties) { + ReservedProperties reservedProperties, + CatalogHandlerUtils catalogHandlerUtils) { super(callContext, entityManager, securityContext, catalogName, authorizer); this.metaStoreManager = metaStoreManager; this.userSecretsManager = userSecretsManager; this.catalogFactory = catalogFactory; this.reservedProperties = reservedProperties; + this.catalogHandlerUtils = catalogHandlerUtils; } /** diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 34a3caf454..013488b6d9 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -49,6 +49,7 @@ import org.apache.polaris.service.catalog.DefaultCatalogPrefixParser; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi; import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApi; +import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; @@ -191,6 +192,8 @@ public Map contextVariables() { ReservedProperties reservedProperties = ReservedProperties.NONE; + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(configurationStore); + IcebergCatalogAdapter service = new IcebergCatalogAdapter( realmContext, @@ -201,7 +204,8 @@ public Map contextVariables() { userSecretsManager, authorizer, new DefaultCatalogPrefixParser(), - reservedProperties); + reservedProperties, + catalogHandlerUtils); IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service); IcebergRestConfigurationApi restConfigurationApi = new IcebergRestConfigurationApi(service); From 72b11aa80cebafd37597a9c627cca3490be84a84 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Mon, 12 May 2025 23:47:56 -0700 Subject: [PATCH 3/9] De-static, add config --- .../core/config/FeatureConfiguration.java | 8 ++ .../catalog/iceberg/CatalogHandlerUtils.java | 86 +++++++++++-------- .../iceberg/IcebergCatalogHandler.java | 2 +- .../apache/polaris/service/TestServices.java | 3 +- 4 files changed, 60 insertions(+), 39 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 64e2ae1a3f..15582dcf04 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -249,4 +249,12 @@ public static void enforceFeatureEnabledOrThrow( .description("The list of supported catalog connection types for federation") .defaultValue(List.of(ConnectionType.ICEBERG_REST.name())) .buildFeatureConfiguration(); + + public static final FeatureConfiguration ICEBERG_COMMIT_MAX_RETRIES = + PolarisConfiguration.builder() + .key("ICEBERG_COMMIT_MAX_RETRIES") + .catalogConfig("polaris.config.iceberg-commit-max-retries") + .description("The max number of times to try committing to an Iceberg table") + .defaultValue(4) + .buildFeatureConfiguration(); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java index 4e08e1b60b..4fc9c04a1f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import com.google.common.base.Preconditions; @@ -83,6 +82,9 @@ import org.apache.iceberg.view.ViewMetadata; import org.apache.iceberg.view.ViewOperations; import org.apache.iceberg.view.ViewRepresentation; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.config.BehaviorChangeConfiguration; +import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.PolarisConfigurationStore; /** @@ -94,10 +96,14 @@ public class CatalogHandlerUtils { private static final Schema EMPTY_SCHEMA = new Schema(); private static final String INITIAL_PAGE_TOKEN = ""; + private final PolarisCallContext polarisCallContext; private final PolarisConfigurationStore configurationStore; @Inject - public CatalogHandlerUtils(PolarisConfigurationStore configurationStore) { + public CatalogHandlerUtils( + PolarisCallContext polarisCallContext, + PolarisConfigurationStore configurationStore) { + this.polarisCallContext = polarisCallContext; this.configurationStore = configurationStore; } @@ -123,7 +129,7 @@ public CommitFailedException wrapped() { } } - private static Pair, String> paginate(List list, String pageToken, int pageSize) { + private Pair, String> paginate(List list, String pageToken, int pageSize) { int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken); if (pageStart >= list.size()) { return Pair.of(Collections.emptyList(), null); @@ -136,7 +142,7 @@ private static Pair, String> paginate(List list, String pageToken return Pair.of(subList, nextPageToken); } - public static ListNamespacesResponse listNamespaces( + public ListNamespacesResponse listNamespaces( SupportsNamespaces catalog, Namespace parent) { List results; if (parent.isEmpty()) { @@ -148,7 +154,7 @@ public static ListNamespacesResponse listNamespaces( return ListNamespacesResponse.builder().addAll(results).build(); } - public static ListNamespacesResponse listNamespaces( + public ListNamespacesResponse listNamespaces( SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) { List results; @@ -166,7 +172,7 @@ public static ListNamespacesResponse listNamespaces( .build(); } - public static CreateNamespaceResponse createNamespace( + public CreateNamespaceResponse createNamespace( SupportsNamespaces catalog, CreateNamespaceRequest request) { Namespace namespace = request.namespace(); catalog.createNamespace(namespace, request.properties()); @@ -176,13 +182,13 @@ public static CreateNamespaceResponse createNamespace( .build(); } - public static void namespaceExists(SupportsNamespaces catalog, Namespace namespace) { + public void namespaceExists(SupportsNamespaces catalog, Namespace namespace) { if (!catalog.namespaceExists(namespace)) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); } } - public static GetNamespaceResponse loadNamespace( + public GetNamespaceResponse loadNamespace( SupportsNamespaces catalog, Namespace namespace) { Map properties = catalog.loadNamespaceMetadata(namespace); return GetNamespaceResponse.builder() @@ -191,14 +197,14 @@ public static GetNamespaceResponse loadNamespace( .build(); } - public static void dropNamespace(SupportsNamespaces catalog, Namespace namespace) { + public void dropNamespace(SupportsNamespaces catalog, Namespace namespace) { boolean dropped = catalog.dropNamespace(namespace); if (!dropped) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); } } - public static UpdateNamespacePropertiesResponse updateNamespaceProperties( + public UpdateNamespacePropertiesResponse updateNamespaceProperties( SupportsNamespaces catalog, Namespace namespace, UpdateNamespacePropertiesRequest request) { request.validate(); @@ -224,12 +230,12 @@ public static UpdateNamespacePropertiesResponse updateNamespaceProperties( .build(); } - public static ListTablesResponse listTables(Catalog catalog, Namespace namespace) { + public ListTablesResponse listTables(Catalog catalog, Namespace namespace) { List idents = catalog.listTables(namespace); return ListTablesResponse.builder().addAll(idents).build(); } - public static ListTablesResponse listTables( + public ListTablesResponse listTables( Catalog catalog, Namespace namespace, String pageToken, String pageSize) { List results = catalog.listTables(namespace); @@ -239,7 +245,7 @@ public static ListTablesResponse listTables( return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); } - public static LoadTableResponse stageTableCreate( + public LoadTableResponse stageTableCreate( Catalog catalog, Namespace namespace, CreateTableRequest request) { request.validate(); @@ -278,7 +284,7 @@ public static LoadTableResponse stageTableCreate( return LoadTableResponse.builder().withTableMetadata(metadata).build(); } - public static LoadTableResponse createTable( + public LoadTableResponse createTable( Catalog catalog, Namespace namespace, CreateTableRequest request) { request.validate(); @@ -301,7 +307,7 @@ public static LoadTableResponse createTable( throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } - public static LoadTableResponse registerTable( + public LoadTableResponse registerTable( Catalog catalog, Namespace namespace, RegisterTableRequest request) { request.validate(); @@ -316,28 +322,28 @@ public static LoadTableResponse registerTable( throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } - public static void dropTable(Catalog catalog, TableIdentifier ident) { + public void dropTable(Catalog catalog, TableIdentifier ident) { boolean dropped = catalog.dropTable(ident, false); if (!dropped) { throw new NoSuchTableException("Table does not exist: %s", ident); } } - public static void purgeTable(Catalog catalog, TableIdentifier ident) { + public void purgeTable(Catalog catalog, TableIdentifier ident) { boolean dropped = catalog.dropTable(ident, true); if (!dropped) { throw new NoSuchTableException("Table does not exist: %s", ident); } } - public static void tableExists(Catalog catalog, TableIdentifier ident) { + public void tableExists(Catalog catalog, TableIdentifier ident) { boolean exists = catalog.tableExists(ident); if (!exists) { throw new NoSuchTableException("Table does not exist: %s", ident); } } - public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) { + public LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) { Table table = catalog.loadTable(ident); if (table instanceof BaseTable) { @@ -352,7 +358,7 @@ public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } - public static LoadTableResponse updateTable( + public LoadTableResponse updateTable( Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { TableMetadata finalMetadata; if (isCreate(request)) { @@ -380,11 +386,11 @@ public static LoadTableResponse updateTable( return LoadTableResponse.builder().withTableMetadata(finalMetadata).build(); } - public static void renameTable(Catalog catalog, RenameTableRequest request) { + public void renameTable(Catalog catalog, RenameTableRequest request) { catalog.renameTable(request.source(), request.destination()); } - private static boolean isCreate(UpdateTableRequest request) { + private boolean isCreate(UpdateTableRequest request) { boolean isCreate = request.requirements().stream() .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); @@ -401,7 +407,7 @@ private static boolean isCreate(UpdateTableRequest request) { return isCreate; } - private static TableMetadata create(TableOperations ops, UpdateTableRequest request) { + private TableMetadata create(TableOperations ops, UpdateTableRequest request) { // the only valid requirement is that the table will be created request.requirements().forEach(requirement -> requirement.validate(ops.current())); Optional formatVersion = @@ -419,11 +425,11 @@ private static TableMetadata create(TableOperations ops, UpdateTableRequest requ return ops.current(); } - static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + protected TableMetadata commit(TableOperations ops, UpdateTableRequest request) { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) - .retry(COMMIT_NUM_RETRIES_DEFAULT) + .retry(maxCommitRetries()) .exponentialBackoff( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, @@ -464,17 +470,17 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { return ops.current(); } - private static BaseView asBaseView(View view) { + private BaseView asBaseView(View view) { Preconditions.checkState( view instanceof BaseView, "Cannot wrap catalog that does not produce BaseView"); return (BaseView) view; } - public static ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) { + public ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) { return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build(); } - public static ListTablesResponse listViews( + public ListTablesResponse listViews( ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) { List results = catalog.listViews(namespace); @@ -484,7 +490,7 @@ public static ListTablesResponse listViews( return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); } - public static LoadViewResponse createView( + public LoadViewResponse createView( ViewCatalog catalog, Namespace namespace, CreateViewRequest request) { request.validate(); @@ -518,7 +524,7 @@ public static LoadViewResponse createView( return viewResponse(view); } - private static LoadViewResponse viewResponse(View view) { + private LoadViewResponse viewResponse(View view) { ViewMetadata metadata = asBaseView(view).operations().current(); return ImmutableLoadViewResponse.builder() .metadata(metadata) @@ -526,18 +532,18 @@ private static LoadViewResponse viewResponse(View view) { .build(); } - public static void viewExists(ViewCatalog catalog, TableIdentifier viewIdentifier) { + public void viewExists(ViewCatalog catalog, TableIdentifier viewIdentifier) { if (!catalog.viewExists(viewIdentifier)) { throw new NoSuchViewException("View does not exist: %s", viewIdentifier); } } - public static LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + public LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier viewIdentifier) { View view = catalog.loadView(viewIdentifier); return viewResponse(view); } - public static LoadViewResponse updateView( + public LoadViewResponse updateView( ViewCatalog catalog, TableIdentifier ident, UpdateTableRequest request) { View view = catalog.loadView(ident); ViewMetadata metadata = commit(asBaseView(view).operations(), request); @@ -548,22 +554,22 @@ public static LoadViewResponse updateView( .build(); } - public static void renameView(ViewCatalog catalog, RenameTableRequest request) { + public void renameView(ViewCatalog catalog, RenameTableRequest request) { catalog.renameView(request.source(), request.destination()); } - public static void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + public void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) { boolean dropped = catalog.dropView(viewIdentifier); if (!dropped) { throw new NoSuchViewException("View does not exist: %s", viewIdentifier); } } - static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { + protected ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) - .retry(COMMIT_NUM_RETRIES_DEFAULT) + .retry(maxCommitRetries()) .exponentialBackoff( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, @@ -604,4 +610,10 @@ static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { return ops.current(); } + + private int maxCommitRetries() { + return configurationStore.getConfiguration( + polarisCallContext, + FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 5fa6c0a248..49dbbcd06f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -191,7 +191,7 @@ public ListNamespacesResponse listNamespaces( .nextPageToken(results.pageToken.toTokenString()) .build(); } else { - return CatalogHandlers.listNamespaces( + return catalogHandlerUtils.listNamespaces( namespaceCatalog, parent, pageToken, String.valueOf(pageSize)); } } diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 013488b6d9..7be8adc8ff 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -192,7 +192,8 @@ public Map contextVariables() { ReservedProperties reservedProperties = ReservedProperties.NONE; - CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(configurationStore); + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils( + callContext.getPolarisCallContext(), configurationStore); IcebergCatalogAdapter service = new IcebergCatalogAdapter( From 4515983a94759c8159171afec63f4a66ee8312f5 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Mon, 12 May 2025 23:48:07 -0700 Subject: [PATCH 4/9] autolint --- .../catalog/iceberg/CatalogHandlerUtils.java | 13 ++++--------- .../org/apache/polaris/service/TestServices.java | 4 ++-- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java index 4fc9c04a1f..39f9b33526 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -83,7 +83,6 @@ import org.apache.iceberg.view.ViewOperations; import org.apache.iceberg.view.ViewRepresentation; import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.config.BehaviorChangeConfiguration; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.PolarisConfigurationStore; @@ -101,8 +100,7 @@ public class CatalogHandlerUtils { @Inject public CatalogHandlerUtils( - PolarisCallContext polarisCallContext, - PolarisConfigurationStore configurationStore) { + PolarisCallContext polarisCallContext, PolarisConfigurationStore configurationStore) { this.polarisCallContext = polarisCallContext; this.configurationStore = configurationStore; } @@ -142,8 +140,7 @@ private Pair, String> paginate(List list, String pageToken, int p return Pair.of(subList, nextPageToken); } - public ListNamespacesResponse listNamespaces( - SupportsNamespaces catalog, Namespace parent) { + public ListNamespacesResponse listNamespaces(SupportsNamespaces catalog, Namespace parent) { List results; if (parent.isEmpty()) { results = catalog.listNamespaces(); @@ -188,8 +185,7 @@ public void namespaceExists(SupportsNamespaces catalog, Namespace namespace) { } } - public GetNamespaceResponse loadNamespace( - SupportsNamespaces catalog, Namespace namespace) { + public GetNamespaceResponse loadNamespace(SupportsNamespaces catalog, Namespace namespace) { Map properties = catalog.loadNamespaceMetadata(namespace); return GetNamespaceResponse.builder() .withNamespace(namespace) @@ -613,7 +609,6 @@ protected ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { private int maxCommitRetries() { return configurationStore.getConfiguration( - polarisCallContext, - FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES); + polarisCallContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES); } } diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 7be8adc8ff..4301fe9933 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -192,8 +192,8 @@ public Map contextVariables() { ReservedProperties reservedProperties = ReservedProperties.NONE; - CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils( - callContext.getPolarisCallContext(), configurationStore); + CatalogHandlerUtils catalogHandlerUtils = + new CatalogHandlerUtils(callContext.getPolarisCallContext(), configurationStore); IcebergCatalogAdapter service = new IcebergCatalogAdapter( From 0e30d21411923a5f8d1246609c90b4f0d99b6426 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Mon, 12 May 2025 23:50:54 -0700 Subject: [PATCH 5/9] license entry --- LICENSE | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE b/LICENSE index 7b965b4a5d..30e2e02748 100644 --- a/LICENSE +++ b/LICENSE @@ -219,6 +219,7 @@ This product includes code from Apache Iceberg. * spec/polaris-catalog-apis/oauth-tokens-api.yaml * integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java * service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +* service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java Copyright: Copyright 2017-2025 The Apache Software Foundation Home page: https://iceberg.apache.org From 456ccb65c8276e394908f88a7825028c09b35b31 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Mon, 12 May 2025 23:52:24 -0700 Subject: [PATCH 6/9] cut over --- .../iceberg/IcebergCatalogHandler.java | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 49dbbcd06f..aa4f025efc 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -258,7 +258,7 @@ public ListNamespacesResponse listNamespaces(Namespace parent) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_NAMESPACES; authorizeBasicNamespaceOperationOrThrow(op, parent); - return CatalogHandlers.listNamespaces(namespaceCatalog, parent); + return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent); } public CreateNamespaceResponse createNamespace(CreateNamespaceRequest request) { @@ -294,7 +294,7 @@ public CreateNamespaceResponse createNamespace(CreateNamespaceRequest request) { .setProperties(filteredProperties) .build(); } else { - return CatalogHandlers.createNamespace(namespaceCatalog, request); + return catalogHandlerUtils.createNamespace(namespaceCatalog, request); } } @@ -308,7 +308,7 @@ public GetNamespaceResponse loadNamespaceMetadata(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_NAMESPACE_METADATA; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.loadNamespace(namespaceCatalog, namespace); + return catalogHandlerUtils.loadNamespace(namespaceCatalog, namespace); } public void namespaceExists(Namespace namespace) { @@ -323,14 +323,14 @@ public void namespaceExists(Namespace namespace) { authorizeBasicNamespaceOperationOrThrow(op, namespace); // TODO: Just skip CatalogHandlers for this one maybe - CatalogHandlers.loadNamespace(namespaceCatalog, namespace); + catalogHandlerUtils.loadNamespace(namespaceCatalog, namespace); } public void dropNamespace(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_NAMESPACE; authorizeBasicNamespaceOperationOrThrow(op, namespace); - CatalogHandlers.dropNamespace(namespaceCatalog, namespace); + catalogHandlerUtils.dropNamespace(namespaceCatalog, namespace); } public UpdateNamespacePropertiesResponse updateNamespaceProperties( @@ -338,7 +338,7 @@ public UpdateNamespacePropertiesResponse updateNamespaceProperties( PolarisAuthorizableOperation op = PolarisAuthorizableOperation.UPDATE_NAMESPACE_PROPERTIES; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.updateNamespaceProperties(namespaceCatalog, namespace, request); + return catalogHandlerUtils.updateNamespaceProperties(namespaceCatalog, namespace, request); } public ListTablesResponse listTables(Namespace namespace, String pageToken, Integer pageSize) { @@ -352,7 +352,7 @@ public ListTablesResponse listTables(Namespace namespace, String pageToken, Inte .nextPageToken(results.pageToken.toTokenString()) .build(); } else { - return CatalogHandlers.listTables( + return catalogHandlerUtils.listTables( baseCatalog, namespace, pageToken, String.valueOf(pageSize)); } } @@ -361,7 +361,7 @@ public ListTablesResponse listTables(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.listTables(baseCatalog, namespace); + return catalogHandlerUtils.listTables(baseCatalog, namespace); } /** @@ -394,7 +394,7 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque .withWriteOrder(request.writeOrder()) .setProperties(reservedProperties.removeReservedProperties(request.properties())) .build(); - return CatalogHandlers.createTable(baseCatalog, namespace, requestWithoutReservedProperties); + return catalogHandlerUtils.createTable(baseCatalog, namespace, requestWithoutReservedProperties); } /** @@ -556,7 +556,7 @@ public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest authorizeCreateTableLikeUnderNamespaceOperationOrThrow( op, TableIdentifier.of(namespace, request.name())); - return CatalogHandlers.registerTable(baseCatalog, namespace, request); + return catalogHandlerUtils.registerTable(baseCatalog, namespace, request); } public boolean sendNotification(TableIdentifier identifier, NotificationRequest request) { @@ -649,7 +649,7 @@ public Optional loadTableIfStale( } } - LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog, tableIdentifier); + LoadTableResponse rawResponse = catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); return Optional.of(filterResponseToSnapshots(rawResponse, snapshots)); } @@ -827,7 +827,7 @@ public LoadTableResponse updateTable( if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot update table on static-facade external catalogs."); } - return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public LoadTableResponse updateTableForStagedCreate( @@ -844,7 +844,7 @@ public LoadTableResponse updateTableForStagedCreate( if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot update table on static-facade external catalogs."); } - return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public void dropTableWithoutPurge(TableIdentifier tableIdentifier) { @@ -852,7 +852,7 @@ public void dropTableWithoutPurge(TableIdentifier tableIdentifier) { authorizeBasicTableLikeOperationOrThrow( op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier); - CatalogHandlers.dropTable(baseCatalog, tableIdentifier); + catalogHandlerUtils.dropTable(baseCatalog, tableIdentifier); } public void dropTableWithPurge(TableIdentifier tableIdentifier) { @@ -869,7 +869,7 @@ public void dropTableWithPurge(TableIdentifier tableIdentifier) { if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot drop table on static-facade external catalogs."); } - CatalogHandlers.purgeTable(baseCatalog, tableIdentifier); + catalogHandlerUtils.purgeTable(baseCatalog, tableIdentifier); } public void tableExists(TableIdentifier tableIdentifier) { @@ -878,7 +878,7 @@ public void tableExists(TableIdentifier tableIdentifier) { op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier); // TODO: Just skip CatalogHandlers for this one maybe - CatalogHandlers.loadTable(baseCatalog, tableIdentifier); + catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); } public void renameTable(RenameTableRequest request) { @@ -895,7 +895,7 @@ public void renameTable(RenameTableRequest request) { if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot rename table on static-facade external catalogs."); } - CatalogHandlers.renameTable(baseCatalog, request); + catalogHandlerUtils.renameTable(baseCatalog, request); } public void commitTransaction(CommitTransactionRequest commitTransactionRequest) { @@ -1015,7 +1015,7 @@ public ListTablesResponse listViews(Namespace namespace, String pageToken, Integ .nextPageToken(results.pageToken.toTokenString()) .build(); } else if (baseCatalog instanceof ViewCatalog viewCatalog) { - return CatalogHandlers.listViews(viewCatalog, namespace, pageToken, String.valueOf(pageSize)); + return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, String.valueOf(pageSize)); } else { throw new BadRequestException( "Unsupported operation: listViews with baseCatalog type: %s", @@ -1027,7 +1027,7 @@ public ListTablesResponse listViews(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.listViews(viewCatalog, namespace); + return catalogHandlerUtils.listViews(viewCatalog, namespace); } public LoadViewResponse createView(Namespace namespace, CreateViewRequest request) { @@ -1044,14 +1044,14 @@ public LoadViewResponse createView(Namespace namespace, CreateViewRequest reques if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot create view on static-facade external catalogs."); } - return CatalogHandlers.createView(viewCatalog, namespace, request); + return catalogHandlerUtils.createView(viewCatalog, namespace, request); } public LoadViewResponse loadView(TableIdentifier viewIdentifier) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_VIEW; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier); - return CatalogHandlers.loadView(viewCatalog, viewIdentifier); + return catalogHandlerUtils.loadView(viewCatalog, viewIdentifier); } public LoadViewResponse replaceView(TableIdentifier viewIdentifier, UpdateTableRequest request) { @@ -1067,14 +1067,14 @@ public LoadViewResponse replaceView(TableIdentifier viewIdentifier, UpdateTableR if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot replace view on static-facade external catalogs."); } - return CatalogHandlers.updateView(viewCatalog, viewIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateView(viewCatalog, viewIdentifier, applyUpdateFilters(request)); } public void dropView(TableIdentifier viewIdentifier) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_VIEW; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier); - CatalogHandlers.dropView(viewCatalog, viewIdentifier); + catalogHandlerUtils.dropView(viewCatalog, viewIdentifier); } public void viewExists(TableIdentifier viewIdentifier) { @@ -1082,7 +1082,7 @@ public void viewExists(TableIdentifier viewIdentifier) { authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier); // TODO: Just skip CatalogHandlers for this one maybe - CatalogHandlers.loadView(viewCatalog, viewIdentifier); + catalogHandlerUtils.loadView(viewCatalog, viewIdentifier); } public void renameView(RenameTableRequest request) { @@ -1099,7 +1099,7 @@ public void renameView(RenameTableRequest request) { if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot rename view on static-facade external catalogs."); } - CatalogHandlers.renameView(viewCatalog, request); + catalogHandlerUtils.renameView(viewCatalog, request); } private @Nonnull LoadTableResponse filterResponseToSnapshots( From 15519a7de153d302c7ce3eb2d1e77966d6a7431b Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Mon, 12 May 2025 23:55:39 -0700 Subject: [PATCH 7/9] pull --- helm/polaris/README.md | 12 ++++++------ .../catalog/iceberg/IcebergCatalogHandler.java | 13 ++++++++----- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/helm/polaris/README.md b/helm/polaris/README.md index f67736614f..886cd4ed3f 100644 --- a/helm/polaris/README.md +++ b/helm/polaris/README.md @@ -48,7 +48,7 @@ A Helm chart for Apache Polaris (incubating). ### Optional -When using EclipseLink backed metastore a custom `persistence.xml` is required, a Kubernetes Secret must be created for it. Below is a sample command: +When using a custom `persistence.xml`, a Kubernetes Secret must be created for it. Below is a sample command: ```bash kubectl create secret generic polaris-secret -n polaris --from-file=persistence.xml ``` @@ -67,7 +67,7 @@ helm unittest helm/polaris The below instructions assume Kind and Helm are installed. Simply run the `run.sh` script from the Polaris repo root, making sure to specify the -`--eclipse-link-deps` if using EclipseLink backed metastore, option: +`--eclipse-link-deps` option: ```bash ./run.sh @@ -186,8 +186,8 @@ kubectl delete namespace polaris ## Values - Key | Type | Default | Description | -|-----|------|-----|-------------| +| Key | Type | Default | Description | +|-----|------|---------|-------------| | advancedConfig | object | `{}` | Advanced configuration. You can pass here any valid Polaris or Quarkus configuration property. Any property that is defined here takes precedence over all the other configuration values generated by this chart. Properties can be passed "flattened" or as nested YAML objects (see examples below). Note: values should be strings; avoid using numbers, booleans, or other types. | | affinity | object | `{}` | Affinity and anti-affinity for polaris pods. See https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity. | | authentication | object | `{"authenticator":{"type":"default"},"tokenBroker":{"maxTokenGeneration":"PT1H","secret":{"name":null,"privateKey":"private.pem","publicKey":"public.pem","secretKey":"secret"},"type":"rsa-key-pair"},"tokenService":{"type":"default"}}` | Polaris authentication configuration. | @@ -285,7 +285,7 @@ kubectl delete namespace polaris | persistence.eclipseLink.secret | object | `{"key":"persistence.xml","name":null}` | The secret name to pull persistence.xml from. | | persistence.eclipseLink.secret.key | string | `"persistence.xml"` | The key in the secret to pull persistence.xml from. | | persistence.eclipseLink.secret.name | string | `nil` | The name of the secret to pull persistence.xml from. If not provided, the default built-in persistence.xml will be used. This is probably not what you want. | -| persistence.type | string | `"relational-jdbc"` | Three built-in types are available: "relational-jdbc", "in-memory", "eclipse-link". The in-memory type is not recommended for production use. The eclipse-link type is deprecated and will be unsupported in a future release. | +| persistence.type | string | `"eclipse-link"` | The type of persistence to use. Two built-in types are supported: in-memory and eclipse-link. | | podAnnotations | object | `{}` | Annotations to apply to polaris pods. | | podLabels | object | `{}` | Additional Labels to apply to polaris pods. | | podSecurityContext | object | `{"fsGroup":10001,"seccompProfile":{"type":"RuntimeDefault"}}` | Security context for the polaris pod. See https://kubernetes.io/docs/tasks/configure-pod-container/security-context/. | @@ -343,4 +343,4 @@ kubectl delete namespace polaris | tracing.attributes | object | `{}` | Resource attributes to identify the polaris service among other tracing sources. See https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/#service. If left empty, traces will be attached to a service named "Apache Polaris"; to change this, provide a service.name attribute here. | | tracing.enabled | bool | `false` | Specifies whether tracing for the polaris server should be enabled. | | tracing.endpoint | string | `"http://otlp-collector:4317"` | The collector endpoint URL to connect to (required). The endpoint URL must have either the http:// or the https:// scheme. The collector must talk the OpenTelemetry protocol (OTLP) and the port must be its gRPC port (by default 4317). See https://quarkus.io/guides/opentelemetry for more information. | -| tracing.sample | string | `"1.0d"` | Which requests should be sampled. Valid values are: "all", "none", or a ratio between 0.0 and "1.0d" (inclusive). E.g. "0.5d" means that 50% of the requests will be sampled. Note: avoid entering numbers here, always prefer a string representation of the ratio. | \ No newline at end of file +| tracing.sample | string | `"1.0d"` | Which requests should be sampled. Valid values are: "all", "none", or a ratio between 0.0 and "1.0d" (inclusive). E.g. "0.5d" means that 50% of the requests will be sampled. Note: avoid entering numbers here, always prefer a string representation of the ratio. | diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index aa4f025efc..972f6871ea 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -55,7 +55,6 @@ import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.rest.CatalogHandlers; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.credentials.ImmutableCredential; @@ -394,7 +393,8 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque .withWriteOrder(request.writeOrder()) .setProperties(reservedProperties.removeReservedProperties(request.properties())) .build(); - return catalogHandlerUtils.createTable(baseCatalog, namespace, requestWithoutReservedProperties); + return catalogHandlerUtils.createTable( + baseCatalog, namespace, requestWithoutReservedProperties); } /** @@ -827,7 +827,8 @@ public LoadTableResponse updateTable( if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot update table on static-facade external catalogs."); } - return catalogHandlerUtils.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateTable( + baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public LoadTableResponse updateTableForStagedCreate( @@ -844,7 +845,8 @@ public LoadTableResponse updateTableForStagedCreate( if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot update table on static-facade external catalogs."); } - return catalogHandlerUtils.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateTable( + baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public void dropTableWithoutPurge(TableIdentifier tableIdentifier) { @@ -1015,7 +1017,8 @@ public ListTablesResponse listViews(Namespace namespace, String pageToken, Integ .nextPageToken(results.pageToken.toTokenString()) .build(); } else if (baseCatalog instanceof ViewCatalog viewCatalog) { - return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, String.valueOf(pageSize)); + return catalogHandlerUtils.listViews( + viewCatalog, namespace, pageToken, String.valueOf(pageSize)); } else { throw new BadRequestException( "Unsupported operation: listViews with baseCatalog type: %s", From 1f42050b1b05cb7fca666188a4994c6aee7c7f3f Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 14 May 2025 19:17:27 -0700 Subject: [PATCH 8/9] revert helm doc --- helm/polaris/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/polaris/README.md b/helm/polaris/README.md index 886cd4ed3f..e6707091ff 100644 --- a/helm/polaris/README.md +++ b/helm/polaris/README.md @@ -285,7 +285,7 @@ kubectl delete namespace polaris | persistence.eclipseLink.secret | object | `{"key":"persistence.xml","name":null}` | The secret name to pull persistence.xml from. | | persistence.eclipseLink.secret.key | string | `"persistence.xml"` | The key in the secret to pull persistence.xml from. | | persistence.eclipseLink.secret.name | string | `nil` | The name of the secret to pull persistence.xml from. If not provided, the default built-in persistence.xml will be used. This is probably not what you want. | -| persistence.type | string | `"eclipse-link"` | The type of persistence to use. Two built-in types are supported: in-memory and eclipse-link. | +| persistence.type | string | `"relational-jdbc"` | Three built-in types are available: "relational-jdbc", "in-memory", "eclipse-link". The in-memory type is not recommended for production use. The eclipse-link type is deprecated and will be unsupported in a future release. | | podAnnotations | object | `{}` | Annotations to apply to polaris pods. | | podLabels | object | `{}` | Additional Labels to apply to polaris pods. | | podSecurityContext | object | `{"fsGroup":10001,"seccompProfile":{"type":"RuntimeDefault"}}` | Security context for the polaris pod. See https://kubernetes.io/docs/tasks/configure-pod-container/security-context/. | From 05ad6032fd91c9de3d1ac6feee185c8e99780f71 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 14 May 2025 19:20:10 -0700 Subject: [PATCH 9/9] autolint --- .../polaris/service/quarkus/admin/PolarisAuthzTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index f2de3aca37..8abb4c041d 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -80,8 +80,8 @@ import org.apache.polaris.core.secrets.UserSecretsManagerFactory; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; -import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.generic.PolarisGenericTableCatalog; +import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.policy.PolicyCatalog;