Skip to content

Commit fdb59b0

Browse files
eric-maynardadutra
andauthored
Support snapshots=refs (#1405)
* initial commit * autolint * small revert * rebase * autolint * simpler * autolint * tests * autolint * stable * fix leak * ready for review * improved test * autolint * logic flip again * Update service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java Co-authored-by: Alexandre Dutra <[email protected]> * Update integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java Co-authored-by: Alexandre Dutra <[email protected]> * adjustments for committed suggestions * autolint --------- Co-authored-by: Alexandre Dutra <[email protected]>
1 parent bfe599e commit fdb59b0

File tree

3 files changed

+121
-7
lines changed

3 files changed

+121
-7
lines changed

integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,14 @@
3434
import java.util.Map;
3535
import org.apache.iceberg.catalog.Namespace;
3636
import org.apache.iceberg.catalog.TableIdentifier;
37+
import org.apache.iceberg.exceptions.RESTException;
38+
import org.apache.iceberg.rest.ErrorHandler;
39+
import org.apache.iceberg.rest.ErrorHandlers;
3740
import org.apache.iceberg.rest.RESTUtil;
3841
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
3942
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
4043
import org.apache.iceberg.rest.responses.ListTablesResponse;
44+
import org.apache.iceberg.rest.responses.LoadTableResponse;
4145
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
4246

4347
/**
@@ -149,6 +153,24 @@ public void dropTable(String catalog, TableIdentifier id) {
149153
}
150154
}
151155

156+
public LoadTableResponse loadTable(String catalog, TableIdentifier id, String snapshots) {
157+
String ns = RESTUtil.encodeNamespace(id.namespace());
158+
try (Response res =
159+
request(
160+
"v1/{cat}/namespaces/" + ns + "/tables/{table}",
161+
Map.of("cat", catalog, "table", id.name()),
162+
snapshots == null ? Map.of() : Map.of("snapshots", snapshots))
163+
.get()) {
164+
if (res.getStatus() == Response.Status.OK.getStatusCode()) {
165+
return res.readEntity(LoadTableResponse.class);
166+
}
167+
throw new RESTException(
168+
"Unhandled error: %s",
169+
((ErrorHandler) ErrorHandlers.defaultErrorHandler())
170+
.parseResponse(res.getStatus(), res.readEntity(String.class)));
171+
}
172+
}
173+
152174
public List<TableIdentifier> listViews(String catalog, Namespace namespace) {
153175
String ns = RESTUtil.encodeNamespace(namespace);
154176
try (Response res =

integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
package org.apache.polaris.service.it.test;
2020

2121
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
22+
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
2223
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
2324
import static org.assertj.core.api.Assertions.assertThat;
25+
import static org.assertj.core.api.Assertions.assertThatCode;
2426
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2527

2628
import com.google.common.collect.ImmutableMap;
@@ -58,6 +60,7 @@
5860
import org.apache.iceberg.catalog.TableIdentifier;
5961
import org.apache.iceberg.exceptions.CommitFailedException;
6062
import org.apache.iceberg.exceptions.ForbiddenException;
63+
import org.apache.iceberg.exceptions.RESTException;
6164
import org.apache.iceberg.expressions.Expressions;
6265
import org.apache.iceberg.io.ResolvingFileIO;
6366
import org.apache.iceberg.rest.RESTCatalog;
@@ -1267,8 +1270,7 @@ public void testDropGenericTable() {
12671270

12681271
genericTableApi.dropGenericTable(currentCatalogName, tableIdentifier);
12691272

1270-
Assertions.assertThatCode(
1271-
() -> genericTableApi.getGenericTable(currentCatalogName, tableIdentifier))
1273+
assertThatCode(() -> genericTableApi.getGenericTable(currentCatalogName, tableIdentifier))
12721274
.isInstanceOf(ProcessingException.class);
12731275

12741276
genericTableApi.purge(currentCatalogName, namespace);
@@ -1343,4 +1345,61 @@ public void testDropNonExistingGenericTable() {
13431345

13441346
genericTableApi.purge(currentCatalogName, namespace);
13451347
}
1348+
1349+
@Test
1350+
public void testLoadTableWithSnapshots() {
1351+
Namespace namespace = Namespace.of("ns1");
1352+
restCatalog.createNamespace(namespace);
1353+
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
1354+
restCatalog.createTable(tableIdentifier, SCHEMA);
1355+
1356+
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "ALL"))
1357+
.doesNotThrowAnyException();
1358+
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "all"))
1359+
.doesNotThrowAnyException();
1360+
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "refs"))
1361+
.doesNotThrowAnyException();
1362+
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "REFS"))
1363+
.doesNotThrowAnyException();
1364+
assertThatCode(() -> catalogApi.loadTable(currentCatalogName, tableIdentifier, "not-real"))
1365+
.isInstanceOf(RESTException.class)
1366+
.hasMessageContaining("Unrecognized snapshots")
1367+
.hasMessageContaining("code=" + BAD_REQUEST.getStatusCode());
1368+
1369+
catalogApi.purge(currentCatalogName, namespace);
1370+
}
1371+
1372+
@Test
1373+
public void testLoadTableWithRefFiltering() {
1374+
Namespace namespace = Namespace.of("ns1");
1375+
restCatalog.createNamespace(namespace);
1376+
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
1377+
1378+
restCatalog.createTable(tableIdentifier, SCHEMA);
1379+
1380+
Table table = restCatalog.loadTable(tableIdentifier);
1381+
1382+
// Create an orphaned snapshot:
1383+
table.newAppend().appendFile(FILE_A).commit();
1384+
long snapshotIdA = table.currentSnapshot().snapshotId();
1385+
table.newAppend().appendFile(FILE_B).commit();
1386+
table.manageSnapshots().setCurrentSnapshot(snapshotIdA).commit();
1387+
1388+
var allSnapshots =
1389+
catalogApi
1390+
.loadTable(currentCatalogName, tableIdentifier, "ALL")
1391+
.tableMetadata()
1392+
.snapshots();
1393+
assertThat(allSnapshots).hasSize(2);
1394+
1395+
var refsSnapshots =
1396+
catalogApi
1397+
.loadTable(currentCatalogName, tableIdentifier, "REFS")
1398+
.tableMetadata()
1399+
.snapshots();
1400+
assertThat(refsSnapshots).hasSize(1);
1401+
assertThat(refsSnapshots.getFirst().snapshotId()).isEqualTo(snapshotIdA);
1402+
1403+
catalogApi.purge(currentCatalogName, namespace);
1404+
}
13461405
}

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.base.Preconditions;
2222
import com.google.common.collect.Maps;
23+
import jakarta.annotation.Nonnull;
2324
import jakarta.ws.rs.core.SecurityContext;
2425
import java.io.Closeable;
2526
import java.time.OffsetDateTime;
@@ -36,6 +37,7 @@
3637
import org.apache.iceberg.BaseTable;
3738
import org.apache.iceberg.MetadataUpdate;
3839
import org.apache.iceberg.PartitionSpec;
40+
import org.apache.iceberg.SnapshotRef;
3941
import org.apache.iceberg.SortOrder;
4042
import org.apache.iceberg.Table;
4143
import org.apache.iceberg.TableMetadata;
@@ -127,6 +129,9 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab
127129
protected SupportsNamespaces namespaceCatalog = null;
128130
protected ViewCatalog viewCatalog = null;
129131

132+
public static final String SNAPSHOTS_ALL = "all";
133+
public static final String SNAPSHOTS_REFS = "refs";
134+
130135
public IcebergCatalogHandler(
131136
CallContext callContext,
132137
PolarisEntityManager entityManager,
@@ -380,7 +385,8 @@ public LoadTableResponse createTableDirectWithWriteDelegation(
380385
Set.of(
381386
PolarisStorageActions.READ,
382387
PolarisStorageActions.WRITE,
383-
PolarisStorageActions.LIST))
388+
PolarisStorageActions.LIST),
389+
SNAPSHOTS_ALL)
384390
.build();
385391
} else if (table instanceof BaseMetadataTable) {
386392
// metadata tables are loaded on the client side, return NoSuchTableException for now
@@ -471,7 +477,7 @@ public LoadTableResponse createTableStagedWithWriteDelegation(
471477
TableMetadata metadata = stageTableCreateHelper(namespace, request);
472478

473479
return buildLoadTableResponseWithDelegationCredentials(
474-
ident, metadata, Set.of(PolarisStorageActions.ALL))
480+
ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL)
475481
.build();
476482
}
477483

@@ -580,7 +586,8 @@ public Optional<LoadTableResponse> loadTableIfStale(
580586
}
581587
}
582588

583-
return Optional.of(CatalogHandlers.loadTable(baseCatalog, tableIdentifier));
589+
LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog, tableIdentifier);
590+
return Optional.of(filterResponseToSnapshots(rawResponse, snapshots));
584591
}
585592

586593
public LoadTableResponse loadTableWithAccessDelegation(
@@ -679,7 +686,7 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
679686
TableMetadata tableMetadata = baseTable.operations().current();
680687
return Optional.of(
681688
buildLoadTableResponseWithDelegationCredentials(
682-
tableIdentifier, tableMetadata, actionsRequested)
689+
tableIdentifier, tableMetadata, actionsRequested, snapshots)
683690
.build());
684691
} else if (table instanceof BaseMetadataTable) {
685692
// metadata tables are loaded on the client side, return NoSuchTableException for now
@@ -692,7 +699,8 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
692699
private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredentials(
693700
TableIdentifier tableIdentifier,
694701
TableMetadata tableMetadata,
695-
Set<PolarisStorageActions> actions) {
702+
Set<PolarisStorageActions> actions,
703+
String snapshots) {
696704
LoadTableResponse.Builder responseBuilder =
697705
LoadTableResponse.builder().withTableMetadata(tableMetadata);
698706
if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
@@ -1010,6 +1018,31 @@ public void renameView(RenameTableRequest request) {
10101018
CatalogHandlers.renameView(viewCatalog, request);
10111019
}
10121020

1021+
private @Nonnull LoadTableResponse filterResponseToSnapshots(
1022+
LoadTableResponse loadTableResponse, String snapshots) {
1023+
if (snapshots == null || snapshots.equalsIgnoreCase(SNAPSHOTS_ALL)) {
1024+
return loadTableResponse;
1025+
} else if (snapshots.equalsIgnoreCase(SNAPSHOTS_REFS)) {
1026+
TableMetadata metadata = loadTableResponse.tableMetadata();
1027+
1028+
Set<Long> referencedSnapshotIds =
1029+
metadata.refs().values().stream()
1030+
.map(SnapshotRef::snapshotId)
1031+
.collect(Collectors.toSet());
1032+
1033+
TableMetadata filteredMetadata =
1034+
metadata.removeSnapshotsIf(s -> !referencedSnapshotIds.contains(s.snapshotId()));
1035+
1036+
return LoadTableResponse.builder()
1037+
.withTableMetadata(filteredMetadata)
1038+
.addAllConfig(loadTableResponse.config())
1039+
.addAllCredentials(loadTableResponse.credentials())
1040+
.build();
1041+
} else {
1042+
throw new IllegalArgumentException("Unrecognized snapshots: " + snapshots);
1043+
}
1044+
}
1045+
10131046
@Override
10141047
public void close() throws Exception {
10151048
if (baseCatalog instanceof Closeable closeable) {

0 commit comments

Comments
 (0)