Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@
import java.util.Map;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.rest.ErrorHandler;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;

/**
Expand Down Expand Up @@ -149,6 +153,24 @@ public void dropTable(String catalog, TableIdentifier id) {
}
}

public LoadTableResponse loadTable(String catalog, TableIdentifier id, String snapshots) {
String ns = RESTUtil.encodeNamespace(id.namespace());
try (Response res =
request(
"v1/{cat}/namespaces/" + ns + "/tables/{table}",
Map.of("cat", catalog, "table", id.name()),
snapshots == null ? Map.of() : Map.of("snapshots", snapshots))
.get()) {
if (res.getStatus() == Response.Status.OK.getStatusCode()) {
return res.readEntity(LoadTableResponse.class);
}
throw new RESTException(
"Unhandled error: %s",
((ErrorHandler) ErrorHandlers.defaultErrorHandler())
.parseResponse(res.getStatus(), res.readEntity(String.class)));
}
}

public List<TableIdentifier> listViews(String catalog, Namespace namespace) {
String ns = RESTUtil.encodeNamespace(namespace);
try (Response res =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.polaris.service.it.test;

import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -58,6 +60,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.rest.RESTCatalog;
Expand Down Expand Up @@ -1267,8 +1270,7 @@ public void testDropGenericTable() {

genericTableApi.dropGenericTable(currentCatalogName, tableIdentifier);

Assertions.assertThatCode(
() -> genericTableApi.getGenericTable(currentCatalogName, tableIdentifier))
assertThatCode(() -> genericTableApi.getGenericTable(currentCatalogName, tableIdentifier))
.isInstanceOf(ProcessingException.class);

genericTableApi.purge(currentCatalogName, namespace);
Expand Down Expand Up @@ -1343,4 +1345,61 @@ public void testDropNonExistingGenericTable() {

genericTableApi.purge(currentCatalogName, namespace);
}

@Test
public void testLoadTableWithSnapshots() {
Namespace namespace = Namespace.of("ns1");
restCatalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
restCatalog.createTable(tableIdentifier, SCHEMA);

assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "ALL"))
.doesNotThrowAnyException();
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "all"))
.doesNotThrowAnyException();
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "refs"))
.doesNotThrowAnyException();
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "REFS"))
.doesNotThrowAnyException();
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "not-real"))
.isInstanceOf(RESTException.class)
.hasMessageContaining("Unrecognized snapshots")
.hasMessageContaining("code=" + BAD_REQUEST.getStatusCode());

catalogApi.purge(currentCatalogName, namespace);
}

@Test
public void testLoadTableWithRefFiltering() {
Namespace namespace = Namespace.of("ns1");
restCatalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");

restCatalog.createTable(tableIdentifier, SCHEMA);

Table table = restCatalog.loadTable(tableIdentifier);

// Create an orphaned snapshot:
table.newAppend().appendFile(FILE_A).commit();
long snapshotIdA = table.currentSnapshot().snapshotId();
table.newAppend().appendFile(FILE_B).commit();
table.manageSnapshots().setCurrentSnapshot(snapshotIdA).commit();

var allSnapshots =
catalogApi
.loadTable(currentCatalogName, tableIdentifier, "ALL")
.tableMetadata()
.snapshots();
assertThat(allSnapshots).hasSize(2);

var refsSnapshots =
catalogApi
.loadTable(currentCatalogName, tableIdentifier, "REFS")
.tableMetadata()
.snapshots();
assertThat(refsSnapshots).hasSize(1);
assertThat(refsSnapshots.getFirst().snapshotId()).isEqualTo(snapshotIdA);

catalogApi.purge(currentCatalogName, namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import jakarta.annotation.Nonnull;
import jakarta.ws.rs.core.SecurityContext;
import java.io.Closeable;
import java.time.OffsetDateTime;
Expand All @@ -36,6 +37,7 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -127,6 +129,9 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab
protected SupportsNamespaces namespaceCatalog = null;
protected ViewCatalog viewCatalog = null;

public static final String SNAPSHOTS_ALL = "all";
public static final String SNAPSHOTS_REFS = "refs";

public IcebergCatalogHandler(
CallContext callContext,
PolarisEntityManager entityManager,
Expand Down Expand Up @@ -380,7 +385,8 @@ public LoadTableResponse createTableDirectWithWriteDelegation(
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
PolarisStorageActions.LIST))
PolarisStorageActions.LIST),
SNAPSHOTS_ALL)
.build();
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return NoSuchTableException for now
Expand Down Expand Up @@ -471,7 +477,7 @@ public LoadTableResponse createTableStagedWithWriteDelegation(
TableMetadata metadata = stageTableCreateHelper(namespace, request);

return buildLoadTableResponseWithDelegationCredentials(
ident, metadata, Set.of(PolarisStorageActions.ALL))
ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL)
.build();
}

Expand Down Expand Up @@ -580,7 +586,8 @@ public Optional<LoadTableResponse> loadTableIfStale(
}
}

return Optional.of(CatalogHandlers.loadTable(baseCatalog, tableIdentifier));
LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog, tableIdentifier);
return Optional.of(filterResponseToSnapshots(rawResponse, snapshots));
}

public LoadTableResponse loadTableWithAccessDelegation(
Expand Down Expand Up @@ -679,7 +686,7 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
TableMetadata tableMetadata = baseTable.operations().current();
return Optional.of(
buildLoadTableResponseWithDelegationCredentials(
tableIdentifier, tableMetadata, actionsRequested)
tableIdentifier, tableMetadata, actionsRequested, snapshots)
.build());
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return NoSuchTableException for now
Expand All @@ -692,7 +699,8 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredentials(
TableIdentifier tableIdentifier,
TableMetadata tableMetadata,
Set<PolarisStorageActions> actions) {
Set<PolarisStorageActions> actions,
String snapshots) {
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
Expand Down Expand Up @@ -1010,6 +1018,31 @@ public void renameView(RenameTableRequest request) {
CatalogHandlers.renameView(viewCatalog, request);
}

private @Nonnull LoadTableResponse filterResponseToSnapshots(
LoadTableResponse loadTableResponse, String snapshots) {
if (snapshots == null || snapshots.equalsIgnoreCase(SNAPSHOTS_ALL)) {
return loadTableResponse;
} else if (snapshots.equalsIgnoreCase(SNAPSHOTS_REFS)) {
TableMetadata metadata = loadTableResponse.tableMetadata();

Set<Long> referencedSnapshotIds =
metadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());

TableMetadata filteredMetadata =
metadata.removeSnapshotsIf(s -> !referencedSnapshotIds.contains(s.snapshotId()));

return LoadTableResponse.builder()
.withTableMetadata(filteredMetadata)
.addAllConfig(loadTableResponse.config())
.addAllCredentials(loadTableResponse.credentials())
.build();
} else {
throw new IllegalArgumentException("Unrecognized snapshots: " + snapshots);
}
}

@Override
public void close() throws Exception {
if (baseCatalog instanceof Closeable closeable) {
Expand Down