diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java index 8312e488bb..7be67f1947 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java @@ -34,10 +34,14 @@ import java.util.Map; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.rest.ErrorHandler; +import org.apache.iceberg.rest.ErrorHandlers; import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.responses.ListNamespacesResponse; import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; /** @@ -149,6 +153,24 @@ public void dropTable(String catalog, TableIdentifier id) { } } + public LoadTableResponse loadTable(String catalog, TableIdentifier id, String snapshots) { + String ns = RESTUtil.encodeNamespace(id.namespace()); + try (Response res = + request( + "v1/{cat}/namespaces/" + ns + "/tables/{table}", + Map.of("cat", catalog, "table", id.name()), + snapshots == null ? Map.of() : Map.of("snapshots", snapshots)) + .get()) { + if (res.getStatus() == Response.Status.OK.getStatusCode()) { + return res.readEntity(LoadTableResponse.class); + } + throw new RESTException( + "Unhandled error: %s", + ((ErrorHandler) ErrorHandlers.defaultErrorHandler()) + .parseResponse(res.getStatus(), res.readEntity(String.class))); + } + } + public List listViews(String catalog, Namespace namespace) { String ns = RESTUtil.encodeNamespace(namespace); try (Response res = diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java index 33bc779541..7b02a9baad 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java @@ -19,8 +19,10 @@ package org.apache.polaris.service.it.test; import static jakarta.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.collect.ImmutableMap; @@ -58,6 +60,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.rest.RESTCatalog; @@ -1267,8 +1270,7 @@ public void testDropGenericTable() { genericTableApi.dropGenericTable(currentCatalogName, tableIdentifier); - Assertions.assertThatCode( - () -> genericTableApi.getGenericTable(currentCatalogName, tableIdentifier)) + assertThatCode(() -> genericTableApi.getGenericTable(currentCatalogName, tableIdentifier)) .isInstanceOf(ProcessingException.class); genericTableApi.purge(currentCatalogName, namespace); @@ -1343,4 +1345,61 @@ public void testDropNonExistingGenericTable() { genericTableApi.purge(currentCatalogName, namespace); } + + @Test + public void testLoadTableWithSnapshots() { + Namespace namespace = Namespace.of("ns1"); + restCatalog.createNamespace(namespace); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1"); + restCatalog.createTable(tableIdentifier, SCHEMA); + + assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "ALL")) + .doesNotThrowAnyException(); + assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "all")) + .doesNotThrowAnyException(); + assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "refs")) + .doesNotThrowAnyException(); + assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "REFS")) + .doesNotThrowAnyException(); + assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "not-real")) + .isInstanceOf(RESTException.class) + .hasMessageContaining("Unrecognized snapshots") + .hasMessageContaining("code=" + BAD_REQUEST.getStatusCode()); + + catalogApi.purge(currentCatalogName, namespace); + } + + @Test + public void testLoadTableWithRefFiltering() { + Namespace namespace = Namespace.of("ns1"); + restCatalog.createNamespace(namespace); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1"); + + restCatalog.createTable(tableIdentifier, SCHEMA); + + Table table = restCatalog.loadTable(tableIdentifier); + + // Create an orphaned snapshot: + table.newAppend().appendFile(FILE_A).commit(); + long snapshotIdA = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + table.manageSnapshots().setCurrentSnapshot(snapshotIdA).commit(); + + var allSnapshots = + catalogApi + .loadTable(currentCatalogName, tableIdentifier, "ALL") + .tableMetadata() + .snapshots(); + assertThat(allSnapshots).hasSize(2); + + var refsSnapshots = + catalogApi + .loadTable(currentCatalogName, tableIdentifier, "REFS") + .tableMetadata() + .snapshots(); + assertThat(refsSnapshots).hasSize(1); + assertThat(refsSnapshots.getFirst().snapshotId()).isEqualTo(snapshotIdA); + + catalogApi.purge(currentCatalogName, namespace); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 716d68fdbc..e9eb5c4f72 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import jakarta.annotation.Nonnull; import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; import java.time.OffsetDateTime; @@ -36,6 +37,7 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -127,6 +129,9 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab protected SupportsNamespaces namespaceCatalog = null; protected ViewCatalog viewCatalog = null; + public static final String SNAPSHOTS_ALL = "all"; + public static final String SNAPSHOTS_REFS = "refs"; + public IcebergCatalogHandler( CallContext callContext, PolarisEntityManager entityManager, @@ -380,7 +385,8 @@ public LoadTableResponse createTableDirectWithWriteDelegation( Set.of( PolarisStorageActions.READ, PolarisStorageActions.WRITE, - PolarisStorageActions.LIST)) + PolarisStorageActions.LIST), + SNAPSHOTS_ALL) .build(); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now @@ -471,7 +477,7 @@ public LoadTableResponse createTableStagedWithWriteDelegation( TableMetadata metadata = stageTableCreateHelper(namespace, request); return buildLoadTableResponseWithDelegationCredentials( - ident, metadata, Set.of(PolarisStorageActions.ALL)) + ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL) .build(); } @@ -580,7 +586,8 @@ public Optional loadTableIfStale( } } - return Optional.of(CatalogHandlers.loadTable(baseCatalog, tableIdentifier)); + LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog, tableIdentifier); + return Optional.of(filterResponseToSnapshots(rawResponse, snapshots)); } public LoadTableResponse loadTableWithAccessDelegation( @@ -679,7 +686,7 @@ public Optional loadTableWithAccessDelegationIfStale( TableMetadata tableMetadata = baseTable.operations().current(); return Optional.of( buildLoadTableResponseWithDelegationCredentials( - tableIdentifier, tableMetadata, actionsRequested) + tableIdentifier, tableMetadata, actionsRequested, snapshots) .build()); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now @@ -692,7 +699,8 @@ public Optional loadTableWithAccessDelegationIfStale( private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredentials( TableIdentifier tableIdentifier, TableMetadata tableMetadata, - Set actions) { + Set actions, + String snapshots) { LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { @@ -1010,6 +1018,31 @@ public void renameView(RenameTableRequest request) { CatalogHandlers.renameView(viewCatalog, request); } + private @Nonnull LoadTableResponse filterResponseToSnapshots( + LoadTableResponse loadTableResponse, String snapshots) { + if (snapshots == null || snapshots.equalsIgnoreCase(SNAPSHOTS_ALL)) { + return loadTableResponse; + } else if (snapshots.equalsIgnoreCase(SNAPSHOTS_REFS)) { + TableMetadata metadata = loadTableResponse.tableMetadata(); + + Set referencedSnapshotIds = + metadata.refs().values().stream() + .map(SnapshotRef::snapshotId) + .collect(Collectors.toSet()); + + TableMetadata filteredMetadata = + metadata.removeSnapshotsIf(s -> !referencedSnapshotIds.contains(s.snapshotId())); + + return LoadTableResponse.builder() + .withTableMetadata(filteredMetadata) + .addAllConfig(loadTableResponse.config()) + .addAllCredentials(loadTableResponse.credentials()) + .build(); + } else { + throw new IllegalArgumentException("Unrecognized snapshots: " + snapshots); + } + } + @Override public void close() throws Exception { if (baseCatalog instanceof Closeable closeable) {