From 666d38ea909ee87e43da0aa23d1b1dd47705fc4d Mon Sep 17 00:00:00 2001 From: Alexandre Dutra Date: Thu, 20 Mar 2025 14:45:48 +0100 Subject: [PATCH] IcebergCatalogAdapter: close underlying catalog consistently With the revert of #b84f4624db8d0bd5b8920b0df719bcc15666008f by #ccf25df7b055e9d232b88a3f6fe8b4e0a2ab035a, we lost an extra benefit that was included in that change: a fix for the fact that IcebergCatalogHandlerWrapper does not always close its underlying `Catalog`, thus relying on `CallContext` to play the role of the "sweep vehicle" and close everything that was left unclosed at the end of the request processing. This PR re-applies that fix again. --- .../iceberg/IcebergCatalogAdapter.java | 290 +++++++++-------- .../iceberg/IcebergCatalogHandlerWrapper.java | 295 ++++++++---------- .../apache/polaris/service/TestServices.java | 3 - 3 files changed, 290 insertions(+), 298 deletions(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index be2574267a..d88ea3a7e4 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -55,14 +55,11 @@ import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; -import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; -import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; -import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.PolarisEntityManager; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; @@ -137,9 +134,6 @@ public IcebergCatalogAdapter( CallContextCatalogFactory catalogFactory, PolarisEntityManager entityManager, PolarisMetaStoreManager metaStoreManager, - BasePersistence session, - PolarisConfigurationStore configurationStore, - PolarisDiagnostics diagnostics, PolarisAuthorizer polarisAuthorizer, CatalogPrefixParser prefixParser) { this.realmContext = realmContext; @@ -163,8 +157,7 @@ private Response withCatalog( String prefix, Function action) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - try (IcebergCatalogHandlerWrapper wrapper = - newHandlerWrapper(realmContext, securityContext, catalogName)) { + try (IcebergCatalogHandlerWrapper wrapper = newHandlerWrapper(securityContext, catalogName)) { return action.apply(wrapper); } catch (RuntimeException e) { LOGGER.debug("RuntimeException while operating on catalog. Propagating to caller.", e); @@ -176,7 +169,7 @@ private Response withCatalog( } private IcebergCatalogHandlerWrapper newHandlerWrapper( - RealmContext realmContext, SecurityContext securityContext, String catalogName) { + SecurityContext securityContext, String catalogName) { AuthenticatedPolarisPrincipal authenticatedPrincipal = (AuthenticatedPolarisPrincipal) securityContext.getUserPrincipal(); if (authenticatedPrincipal == null) { @@ -199,10 +192,10 @@ public Response createNamespace( CreateNamespaceRequest createNamespaceRequest, RealmContext realmContext, SecurityContext securityContext) { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .createNamespace(createNamespaceRequest)) - .build(); + return withCatalog( + securityContext, + prefix, + catalog -> Response.ok(catalog.createNamespace(createNamespaceRequest)).build()); } @Override @@ -215,19 +208,19 @@ public Response listNamespaces( SecurityContext securityContext) { Optional namespaceOptional = Optional.ofNullable(parent).map(IcebergCatalogAdapter::decodeNamespace); - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .listNamespaces(namespaceOptional.orElse(Namespace.of()))) - .build(); + return withCatalog( + securityContext, + prefix, + catalog -> + Response.ok(catalog.listNamespaces(namespaceOptional.orElse(Namespace.of()))).build()); } @Override public Response loadNamespaceMetadata( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix).loadNamespaceMetadata(ns)) - .build(); + return withCatalog( + securityContext, prefix, catalog -> Response.ok(catalog.loadNamespaceMetadata(ns)).build()); } private static Namespace decodeNamespace(String namespace) { @@ -238,16 +231,26 @@ private static Namespace decodeNamespace(String namespace) { public Response namespaceExists( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - newHandlerWrapper(realmContext, securityContext, prefix).namespaceExists(ns); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.namespaceExists(ns); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override public Response dropNamespace( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - newHandlerWrapper(realmContext, securityContext, prefix).dropNamespace(ns); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.dropNamespace(ns); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -258,10 +261,12 @@ public Response updateProperties( RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .updateNamespaceProperties(ns, updateNamespacePropertiesRequest)) - .build(); + return withCatalog( + securityContext, + prefix, + catalog -> + Response.ok(catalog.updateNamespaceProperties(ns, updateNamespacePropertiesRequest)) + .build()); } private EnumSet parseAccessDelegationModes(String accessDelegationMode) { @@ -285,29 +290,25 @@ public Response createTable( EnumSet delegationModes = parseAccessDelegationModes(accessDelegationMode); Namespace ns = decodeNamespace(namespace); - if (createTableRequest.stageCreate()) { - if (delegationModes.isEmpty()) { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .createTableStaged(ns, createTableRequest)) - .build(); - } else { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .createTableStagedWithWriteDelegation(ns, createTableRequest)) - .build(); - } - } else if (delegationModes.isEmpty()) { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .createTableDirect(ns, createTableRequest)) - .build(); - } else { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .createTableDirectWithWriteDelegation(ns, createTableRequest)) - .build(); - } + return withCatalog( + securityContext, + prefix, + catalog -> { + if (createTableRequest.stageCreate()) { + if (delegationModes.isEmpty()) { + return Response.ok(catalog.createTableStaged(ns, createTableRequest)).build(); + } else { + return Response.ok( + catalog.createTableStagedWithWriteDelegation(ns, createTableRequest)) + .build(); + } + } else if (delegationModes.isEmpty()) { + return Response.ok(catalog.createTableDirect(ns, createTableRequest)).build(); + } else { + return Response.ok(catalog.createTableDirectWithWriteDelegation(ns, createTableRequest)) + .build(); + } + }); } @Override @@ -319,8 +320,8 @@ public Response listTables( RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok(newHandlerWrapper(realmContext, securityContext, prefix).listTables(ns)) - .build(); + return withCatalog( + securityContext, prefix, catalog -> Response.ok(catalog.listTables(ns)).build()); } @Override @@ -336,17 +337,17 @@ public Response loadTable( parseAccessDelegationModes(accessDelegationMode); Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - if (delegationModes.isEmpty()) { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .loadTable(tableIdentifier, snapshots)) - .build(); - } else { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .loadTableWithAccessDelegation(tableIdentifier, snapshots)) - .build(); - } + return withCatalog( + securityContext, + prefix, + catalog -> { + if (delegationModes.isEmpty()) { + return Response.ok(catalog.loadTable(tableIdentifier, snapshots)).build(); + } else { + return Response.ok(catalog.loadTableWithAccessDelegation(tableIdentifier, snapshots)) + .build(); + } + }); } @Override @@ -358,8 +359,13 @@ public Response tableExists( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - newHandlerWrapper(realmContext, securityContext, prefix).tableExists(tableIdentifier); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.tableExists(tableIdentifier); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -372,14 +378,17 @@ public Response dropTable( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - - if (purgeRequested != null && purgeRequested) { - newHandlerWrapper(realmContext, securityContext, prefix).dropTableWithPurge(tableIdentifier); - } else { - newHandlerWrapper(realmContext, securityContext, prefix) - .dropTableWithoutPurge(tableIdentifier); - } - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + if (purgeRequested != null && purgeRequested) { + catalog.dropTableWithPurge(tableIdentifier); + } else { + catalog.dropTableWithoutPurge(tableIdentifier); + } + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -390,10 +399,10 @@ public Response registerTable( RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .registerTable(ns, registerTableRequest)) - .build(); + return withCatalog( + securityContext, + prefix, + catalog -> Response.ok(catalog.registerTable(ns, registerTableRequest)).build()); } @Override @@ -402,8 +411,13 @@ public Response renameTable( RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - newHandlerWrapper(realmContext, securityContext, prefix).renameTable(renameTableRequest); - return Response.ok(javax.ws.rs.core.Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.renameTable(renameTableRequest); + return Response.ok(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -416,18 +430,18 @@ public Response updateTable( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - - if (IcebergCatalogHandlerWrapper.isCreate(commitTableRequest)) { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .updateTableForStagedCreate(tableIdentifier, commitTableRequest)) - .build(); - } else { - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .updateTable(tableIdentifier, commitTableRequest)) - .build(); - } + return withCatalog( + securityContext, + prefix, + catalog -> { + if (IcebergCatalogHandlerWrapper.isCreate(commitTableRequest)) { + return Response.ok( + catalog.updateTableForStagedCreate(tableIdentifier, commitTableRequest)) + .build(); + } else { + return Response.ok(catalog.updateTable(tableIdentifier, commitTableRequest)).build(); + } + }); } @Override @@ -438,10 +452,10 @@ public Response createView( RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .createView(ns, createViewRequest)) - .build(); + return withCatalog( + securityContext, + prefix, + catalog -> Response.ok(catalog.createView(ns, createViewRequest)).build()); } @Override @@ -453,8 +467,8 @@ public Response listViews( RealmContext realmContext, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok(newHandlerWrapper(realmContext, securityContext, prefix).listViews(ns)) - .build(); + return withCatalog( + securityContext, prefix, catalog -> Response.ok(catalog.listViews(ns)).build()); } @Override @@ -466,14 +480,18 @@ public Response loadCredentials( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - LoadTableResponse loadTableResponse = - newHandlerWrapper(realmContext, securityContext, prefix) - .loadTableWithAccessDelegation(tableIdentifier, "all"); - return Response.ok( - ImmutableLoadCredentialsResponse.builder() - .credentials(loadTableResponse.credentials()) - .build()) - .build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + LoadTableResponse loadTableResponse = + catalog.loadTableWithAccessDelegation(tableIdentifier, "all"); + return Response.ok( + ImmutableLoadCredentialsResponse.builder() + .credentials(loadTableResponse.credentials()) + .build()) + .build(); + }); } @Override @@ -485,9 +503,8 @@ public Response loadView( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(view)); - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix).loadView(tableIdentifier)) - .build(); + return withCatalog( + securityContext, prefix, catalog -> Response.ok(catalog.loadView(tableIdentifier)).build()); } @Override @@ -499,8 +516,13 @@ public Response viewExists( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(view)); - newHandlerWrapper(realmContext, securityContext, prefix).viewExists(tableIdentifier); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.viewExists(tableIdentifier); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -512,8 +534,13 @@ public Response dropView( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(view)); - newHandlerWrapper(realmContext, securityContext, prefix).dropView(tableIdentifier); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.dropView(tableIdentifier); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -522,8 +549,13 @@ public Response renameView( RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - newHandlerWrapper(realmContext, securityContext, prefix).renameView(renameTableRequest); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.renameView(renameTableRequest); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -536,10 +568,10 @@ public Response replaceView( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(view)); - return Response.ok( - newHandlerWrapper(realmContext, securityContext, prefix) - .replaceView(tableIdentifier, commitViewRequest)) - .build(); + return withCatalog( + securityContext, + prefix, + catalog -> Response.ok(catalog.replaceView(tableIdentifier, commitViewRequest)).build()); } @Override @@ -548,9 +580,13 @@ public Response commitTransaction( CommitTransactionRequest commitTransactionRequest, RealmContext realmContext, SecurityContext securityContext) { - newHandlerWrapper(realmContext, securityContext, prefix) - .commitTransaction(commitTransactionRequest); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.commitTransaction(commitTransactionRequest); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } @Override @@ -574,9 +610,13 @@ public Response sendNotification( SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - newHandlerWrapper(realmContext, securityContext, prefix) - .sendNotification(tableIdentifier, notificationRequest); - return Response.status(Response.Status.NO_CONTENT).build(); + return withCatalog( + securityContext, + prefix, + catalog -> { + catalog.sendNotification(tableIdentifier, notificationRequest); + return Response.status(Response.Status.NO_CONTENT).build(); + }); } /** From IcebergRestConfigurationApiService. */ diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerWrapper.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerWrapper.java index c774525c84..543ba907c6 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerWrapper.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerWrapper.java @@ -22,7 +22,6 @@ import com.google.common.collect.Maps; import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; -import java.io.IOException; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; @@ -32,7 +31,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; @@ -464,7 +462,7 @@ public ListNamespacesResponse listNamespaces(Namespace parent) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_NAMESPACES; authorizeBasicNamespaceOperationOrThrow(op, parent); - return doCatalogOperation(() -> CatalogHandlers.listNamespaces(namespaceCatalog, parent)); + return CatalogHandlers.listNamespaces(namespaceCatalog, parent); } public CreateNamespaceResponse createNamespace(CreateNamespaceRequest request) { @@ -487,20 +485,17 @@ public CreateNamespaceResponse createNamespace(CreateNamespaceRequest request) { // For CreateNamespace, we consider this a special case in that the creator is able to // retrieve the latest namespace metadata for the duration of the CreateNamespace // operation, even if the entityVersion and/or grantsVersion update in the interim. - return doCatalogOperation( - () -> { - namespaceCatalog.createNamespace(namespace, request.properties()); - return CreateNamespaceResponse.builder() - .withNamespace(namespace) - .setProperties( - resolutionManifest - .getPassthroughResolvedPath(namespace) - .getRawLeafEntity() - .getPropertiesAsMap()) - .build(); - }); + namespaceCatalog.createNamespace(namespace, request.properties()); + return CreateNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties( + resolutionManifest + .getPassthroughResolvedPath(namespace) + .getRawLeafEntity() + .getPropertiesAsMap()) + .build(); } else { - return doCatalogOperation(() -> CatalogHandlers.createNamespace(namespaceCatalog, request)); + return CatalogHandlers.createNamespace(namespaceCatalog, request); } } @@ -509,37 +504,11 @@ private static boolean isExternal(CatalogEntity catalog) { catalog.getCatalogType()); } - private void doCatalogOperation(Runnable handler) { - doCatalogOperation( - () -> { - handler.run(); - return null; - }); - } - - /** - * Execute a catalog function and ensure we close the BaseCatalog afterward. This will typically - * ensure the underlying FileIO is closed - */ - private T doCatalogOperation(Supplier handler) { - try { - return handler.get(); - } finally { - if (baseCatalog instanceof Closeable closeable) { - try { - closeable.close(); - } catch (IOException e) { - LOGGER.error("Error while closing BaseCatalog", e); - } - } - } - } - public GetNamespaceResponse loadNamespaceMetadata(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_NAMESPACE_METADATA; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return doCatalogOperation(() -> CatalogHandlers.loadNamespace(namespaceCatalog, namespace)); + return CatalogHandlers.loadNamespace(namespaceCatalog, namespace); } public void namespaceExists(Namespace namespace) { @@ -554,14 +523,14 @@ public void namespaceExists(Namespace namespace) { authorizeBasicNamespaceOperationOrThrow(op, namespace); // TODO: Just skip CatalogHandlers for this one maybe - doCatalogOperation(() -> CatalogHandlers.loadNamespace(namespaceCatalog, namespace)); + CatalogHandlers.loadNamespace(namespaceCatalog, namespace); } public void dropNamespace(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_NAMESPACE; authorizeBasicNamespaceOperationOrThrow(op, namespace); - doCatalogOperation(() -> CatalogHandlers.dropNamespace(namespaceCatalog, namespace)); + CatalogHandlers.dropNamespace(namespaceCatalog, namespace); } public UpdateNamespacePropertiesResponse updateNamespaceProperties( @@ -569,15 +538,14 @@ public UpdateNamespacePropertiesResponse updateNamespaceProperties( PolarisAuthorizableOperation op = PolarisAuthorizableOperation.UPDATE_NAMESPACE_PROPERTIES; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return doCatalogOperation( - () -> CatalogHandlers.updateNamespaceProperties(namespaceCatalog, namespace, request)); + return CatalogHandlers.updateNamespaceProperties(namespaceCatalog, namespace, request); } public ListTablesResponse listTables(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return doCatalogOperation(() -> CatalogHandlers.listTables(baseCatalog, namespace)); + return CatalogHandlers.listTables(baseCatalog, namespace); } public LoadTableResponse createTableDirect(Namespace namespace, CreateTableRequest request) { @@ -594,7 +562,7 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque if (isExternal(catalog)) { throw new BadRequestException("Cannot create table on external catalogs."); } - return doCatalogOperation(() -> CatalogHandlers.createTable(baseCatalog, namespace, request)); + return CatalogHandlers.createTable(baseCatalog, namespace, request); } public LoadTableResponse createTableDirectWithWriteDelegation( @@ -613,55 +581,52 @@ public LoadTableResponse createTableDirectWithWriteDelegation( if (isExternal(catalog)) { throw new BadRequestException("Cannot create table on external catalogs."); } - return doCatalogOperation( - () -> { - request.validate(); - - TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); - if (baseCatalog.tableExists(tableIdentifier)) { - throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); - } - - Map properties = Maps.newHashMap(); - properties.put("created-at", OffsetDateTime.now(ZoneOffset.UTC).toString()); - properties.putAll(request.properties()); - - Table table = - baseCatalog - .buildTable(tableIdentifier, request.schema()) - .withLocation(request.location()) - .withPartitionSpec(request.spec()) - .withSortOrder(request.writeOrder()) - .withProperties(properties) - .create(); - - if (table instanceof BaseTable baseTable) { - TableMetadata tableMetadata = baseTable.operations().current(); - LoadTableResponse.Builder responseBuilder = - LoadTableResponse.builder().withTableMetadata(tableMetadata); - if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { - LOGGER - .atDebug() - .addKeyValue("tableIdentifier", tableIdentifier) - .addKeyValue("tableLocation", tableMetadata.location()) - .log("Fetching client credentials for table"); - responseBuilder.addAllConfig( - credentialDelegation.getCredentialConfig( - tableIdentifier, - tableMetadata, - Set.of( - PolarisStorageActions.READ, - PolarisStorageActions.WRITE, - PolarisStorageActions.LIST))); - } - return responseBuilder.build(); - } else if (table instanceof BaseMetadataTable) { - // metadata tables are loaded on the client side, return NoSuchTableException for now - throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); - } - - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); - }); + request.validate(); + + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); + if (baseCatalog.tableExists(tableIdentifier)) { + throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); + } + + Map properties = Maps.newHashMap(); + properties.put("created-at", OffsetDateTime.now(ZoneOffset.UTC).toString()); + properties.putAll(request.properties()); + + Table table = + baseCatalog + .buildTable(tableIdentifier, request.schema()) + .withLocation(request.location()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(properties) + .create(); + + if (table instanceof BaseTable baseTable) { + TableMetadata tableMetadata = baseTable.operations().current(); + LoadTableResponse.Builder responseBuilder = + LoadTableResponse.builder().withTableMetadata(tableMetadata); + if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .addKeyValue("tableLocation", tableMetadata.location()) + .log("Fetching client credentials for table"); + responseBuilder.addAllConfig( + credentialDelegation.getCredentialConfig( + tableIdentifier, + tableMetadata, + Set.of( + PolarisStorageActions.READ, + PolarisStorageActions.WRITE, + PolarisStorageActions.LIST))); + } + return responseBuilder.build(); + } else if (table instanceof BaseMetadataTable) { + // metadata tables are loaded on the client side, return NoSuchTableException for now + throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } private TableMetadata stageTableCreateHelper(Namespace namespace, CreateTableRequest request) { @@ -721,11 +686,8 @@ public LoadTableResponse createTableStaged(Namespace namespace, CreateTableReque if (isExternal(catalog)) { throw new BadRequestException("Cannot create table on external catalogs."); } - return doCatalogOperation( - () -> { - TableMetadata metadata = stageTableCreateHelper(namespace, request); - return LoadTableResponse.builder().withTableMetadata(metadata).build(); - }); + TableMetadata metadata = stageTableCreateHelper(namespace, request); + return LoadTableResponse.builder().withTableMetadata(metadata).build(); } public LoadTableResponse createTableStagedWithWriteDelegation( @@ -744,26 +706,23 @@ public LoadTableResponse createTableStagedWithWriteDelegation( if (isExternal(catalog)) { throw new BadRequestException("Cannot create table on external catalogs."); } - return doCatalogOperation( - () -> { - TableIdentifier ident = TableIdentifier.of(namespace, request.name()); - TableMetadata metadata = stageTableCreateHelper(namespace, request); - - LoadTableResponse.Builder responseBuilder = - LoadTableResponse.builder().withTableMetadata(metadata); - - if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { - LOGGER - .atDebug() - .addKeyValue("tableIdentifier", ident) - .addKeyValue("tableLocation", metadata.location()) - .log("Fetching client credentials for table"); - responseBuilder.addAllConfig( - credentialDelegation.getCredentialConfig( - ident, metadata, Set.of(PolarisStorageActions.ALL))); - } - return responseBuilder.build(); - }); + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + TableMetadata metadata = stageTableCreateHelper(namespace, request); + + LoadTableResponse.Builder responseBuilder = + LoadTableResponse.builder().withTableMetadata(metadata); + + if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", ident) + .addKeyValue("tableLocation", metadata.location()) + .log("Fetching client credentials for table"); + responseBuilder.addAllConfig( + credentialDelegation.getCredentialConfig( + ident, metadata, Set.of(PolarisStorageActions.ALL))); + } + return responseBuilder.build(); } public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest request) { @@ -771,7 +730,7 @@ public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest authorizeCreateTableLikeUnderNamespaceOperationOrThrow( op, TableIdentifier.of(namespace, request.name())); - return doCatalogOperation(() -> CatalogHandlers.registerTable(baseCatalog, namespace, request)); + return CatalogHandlers.registerTable(baseCatalog, namespace, request); } public boolean sendNotification(TableIdentifier identifier, NotificationRequest request) { @@ -816,7 +775,7 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_TABLE; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.TABLE, tableIdentifier); - return doCatalogOperation(() -> CatalogHandlers.loadTable(baseCatalog, tableIdentifier)); + return CatalogHandlers.loadTable(baseCatalog, tableIdentifier); } public LoadTableResponse loadTableWithAccessDelegation( @@ -865,32 +824,29 @@ public LoadTableResponse loadTableWithAccessDelegation( // TODO: Find a way for the configuration or caller to better express whether to fail or omit // when data-access is specified but access delegation grants are not found. - return doCatalogOperation( - () -> { - Table table = baseCatalog.loadTable(tableIdentifier); - - if (table instanceof BaseTable baseTable) { - TableMetadata tableMetadata = baseTable.operations().current(); - LoadTableResponse.Builder responseBuilder = - LoadTableResponse.builder().withTableMetadata(tableMetadata); - if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { - LOGGER - .atDebug() - .addKeyValue("tableIdentifier", tableIdentifier) - .addKeyValue("tableLocation", tableMetadata.location()) - .log("Fetching client credentials for table"); - responseBuilder.addAllConfig( - credentialDelegation.getCredentialConfig( - tableIdentifier, tableMetadata, actionsRequested)); - } - return responseBuilder.build(); - } else if (table instanceof BaseMetadataTable) { - // metadata tables are loaded on the client side, return NoSuchTableException for now - throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); - } - - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); - }); + Table table = baseCatalog.loadTable(tableIdentifier); + + if (table instanceof BaseTable baseTable) { + TableMetadata tableMetadata = baseTable.operations().current(); + LoadTableResponse.Builder responseBuilder = + LoadTableResponse.builder().withTableMetadata(tableMetadata); + if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .addKeyValue("tableLocation", tableMetadata.location()) + .log("Fetching client credentials for table"); + responseBuilder.addAllConfig( + credentialDelegation.getCredentialConfig( + tableIdentifier, tableMetadata, actionsRequested)); + } + return responseBuilder.build(); + } else if (table instanceof BaseMetadataTable) { + // metadata tables are loaded on the client side, return NoSuchTableException for now + throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } private UpdateTableRequest applyUpdateFilters(UpdateTableRequest request) { @@ -931,9 +887,7 @@ public LoadTableResponse updateTable( if (isExternal(catalog)) { throw new BadRequestException("Cannot update table on external catalogs."); } - return doCatalogOperation( - () -> - CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request))); + return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public LoadTableResponse updateTableForStagedCreate( @@ -950,16 +904,14 @@ public LoadTableResponse updateTableForStagedCreate( if (isExternal(catalog)) { throw new BadRequestException("Cannot update table on external catalogs."); } - return doCatalogOperation( - () -> - CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request))); + return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public void dropTableWithoutPurge(TableIdentifier tableIdentifier) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_TABLE_WITHOUT_PURGE; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.TABLE, tableIdentifier); - doCatalogOperation(() -> CatalogHandlers.dropTable(baseCatalog, tableIdentifier)); + CatalogHandlers.dropTable(baseCatalog, tableIdentifier); } public void dropTableWithPurge(TableIdentifier tableIdentifier) { @@ -975,7 +927,7 @@ public void dropTableWithPurge(TableIdentifier tableIdentifier) { if (isExternal(catalog)) { throw new BadRequestException("Cannot drop table on external catalogs."); } - doCatalogOperation(() -> CatalogHandlers.purgeTable(baseCatalog, tableIdentifier)); + CatalogHandlers.purgeTable(baseCatalog, tableIdentifier); } public void tableExists(TableIdentifier tableIdentifier) { @@ -983,7 +935,7 @@ public void tableExists(TableIdentifier tableIdentifier) { authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.TABLE, tableIdentifier); // TODO: Just skip CatalogHandlers for this one maybe - doCatalogOperation(() -> CatalogHandlers.loadTable(baseCatalog, tableIdentifier)); + CatalogHandlers.loadTable(baseCatalog, tableIdentifier); } public void renameTable(RenameTableRequest request) { @@ -1000,7 +952,7 @@ public void renameTable(RenameTableRequest request) { if (isExternal(catalog)) { throw new BadRequestException("Cannot rename table on external catalogs."); } - doCatalogOperation(() -> CatalogHandlers.renameTable(baseCatalog, request)); + CatalogHandlers.renameTable(baseCatalog, request); } public void commitTransaction(CommitTransactionRequest commitTransactionRequest) { @@ -1113,7 +1065,7 @@ public ListTablesResponse listViews(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return doCatalogOperation(() -> CatalogHandlers.listViews(viewCatalog, namespace)); + return CatalogHandlers.listViews(viewCatalog, namespace); } public LoadViewResponse createView(Namespace namespace, CreateViewRequest request) { @@ -1130,14 +1082,14 @@ public LoadViewResponse createView(Namespace namespace, CreateViewRequest reques if (isExternal(catalog)) { throw new BadRequestException("Cannot create view on external catalogs."); } - return doCatalogOperation(() -> CatalogHandlers.createView(viewCatalog, namespace, request)); + return CatalogHandlers.createView(viewCatalog, namespace, request); } public LoadViewResponse loadView(TableIdentifier viewIdentifier) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_VIEW; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.VIEW, viewIdentifier); - return doCatalogOperation(() -> CatalogHandlers.loadView(viewCatalog, viewIdentifier)); + return CatalogHandlers.loadView(viewCatalog, viewIdentifier); } public LoadViewResponse replaceView(TableIdentifier viewIdentifier, UpdateTableRequest request) { @@ -1153,15 +1105,14 @@ public LoadViewResponse replaceView(TableIdentifier viewIdentifier, UpdateTableR if (isExternal(catalog)) { throw new BadRequestException("Cannot replace view on external catalogs."); } - return doCatalogOperation( - () -> CatalogHandlers.updateView(viewCatalog, viewIdentifier, applyUpdateFilters(request))); + return CatalogHandlers.updateView(viewCatalog, viewIdentifier, applyUpdateFilters(request)); } public void dropView(TableIdentifier viewIdentifier) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_VIEW; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.VIEW, viewIdentifier); - doCatalogOperation(() -> CatalogHandlers.dropView(viewCatalog, viewIdentifier)); + CatalogHandlers.dropView(viewCatalog, viewIdentifier); } public void viewExists(TableIdentifier viewIdentifier) { @@ -1169,7 +1120,7 @@ public void viewExists(TableIdentifier viewIdentifier) { authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.VIEW, viewIdentifier); // TODO: Just skip CatalogHandlers for this one maybe - doCatalogOperation(() -> CatalogHandlers.loadView(viewCatalog, viewIdentifier)); + CatalogHandlers.loadView(viewCatalog, viewIdentifier); } public void renameView(RenameTableRequest request) { @@ -1186,9 +1137,13 @@ public void renameView(RenameTableRequest request) { if (isExternal(catalog)) { throw new BadRequestException("Cannot rename view on external catalogs."); } - doCatalogOperation(() -> CatalogHandlers.renameView(viewCatalog, request)); + CatalogHandlers.renameView(viewCatalog, request); } @Override - public void close() throws Exception {} + public void close() throws Exception { + if (baseCatalog instanceof Closeable closeable) { + closeable.close(); + } + } } diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index f14b27a391..c763af661c 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -177,9 +177,6 @@ public Map contextVariables() { callContextFactory, entityManager, metaStoreManager, - metaStoreSession, - configurationStore, - polarisDiagnostics, authorizer, new DefaultCatalogPrefixParser());