Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
39e0ea4
Pulled in iceberg 1.8.0 spec changes for freshness aware table loadin…
mansehajsingh Feb 18, 2025
0ef4de8
Changed etag support to use entityId:version tuple
mansehajsingh Feb 26, 2025
bb82f56
Merge remote-tracking branch 'origin/main' into implement-freshness-a…
mansehajsingh Feb 26, 2025
44b1859
fixed getresponse call
mansehajsingh Feb 26, 2025
ab27b82
Merge remote-tracking branch 'origin/main' into implement-freshness-a…
mansehajsingh Feb 26, 2025
be3da8d
Changed etagged response to record and gave default implementation to…
mansehajsingh Feb 27, 2025
e96bbdb
Made iceberg rest spec docs clearer
mansehajsingh Feb 27, 2025
772c715
Added HTTP Compliant ETag and IfNoneMatch representations and separat…
mansehajsingh Feb 28, 2025
662f24a
Changed ETag to be a record and improved semantics of IfNoneMatch
mansehajsingh Feb 28, 2025
652cc08
Fixed semantics of if none match
mansehajsingh Feb 28, 2025
2c4fd1b
Removed ETag representation, consolidated in IfNoneMatch
mansehajsingh Mar 1, 2025
3b4e553
fixed if none match parsing
mansehajsingh Mar 4, 2025
88e75ed
Added table entity retrieval method to table operations
mansehajsingh Mar 19, 2025
c60ffa9
Merged main
mansehajsingh Mar 19, 2025
ae7e827
removed accidental commit of pycache folders
mansehajsingh Mar 19, 2025
8aba177
Merged in main and fixed conflicts with generic table support
mansehajsingh Mar 19, 2025
63c28ba
Fixed merge conflicts
mansehajsingh Mar 20, 2025
448e2d4
Fixed formatting
mansehajsingh Mar 20, 2025
8f970c4
Changed to use metadata location hash
mansehajsingh Mar 26, 2025
e8ccc76
Ran formatting
mansehajsingh Mar 26, 2025
6b25a15
use sha256
mansehajsingh Mar 26, 2025
49659f1
Moved out ETag functions to utility class and removed ETaggedLoadTabl…
mansehajsingh Mar 26, 2025
d6fd55c
Addressed comments
mansehajsingh Apr 1, 2025
b5175d7
Merge branch 'main' into implement-freshness-aware-table-loading
mansehajsingh Apr 1, 2025
6e23473
Merge branch 'main' into implement-freshness-aware-table-loading
mansehajsingh Apr 1, 2025
183b8d7
Fixed IcebergTableLikeEntity package rename
mansehajsingh Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.Invocation;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -646,6 +649,154 @@ public void testLoadTableWithAccessDelegationForExternalCatalogWithConfigEnabled
}
}

/**
* Register a table. Then, invoke an initial loadTable request to fetch and ensure ETag is
* present. Then, invoke a second loadTable to ensure that ETag is matched.
*/
@Test
@Disabled("Enable once ETag support is available in the API for loadTable.")
public void testLoadTableTwiceWithETag() {
// TODO: Re-enable test once spec is up to date with ETag change for loadTable in Iceberg

Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));
restCatalog.registerTable(TableIdentifier.of(ns1, "my_table_etagged"), fileLocation);
Invocation invocation =
catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_table_etagged")
.build("GET");
try (Response initialLoadTable = invocation.invoke()) {
assertThat(initialLoadTable.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = initialLoadTable.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation =
catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_table_etagged")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus())
.isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

/**
* Invoke an initial registerTable request to fetch and ensure ETag is present. Then, invoke a
* second loadTable to ensure that ETag is matched.
*/
@Test
@Disabled("Enable once ETag support is available in the API for loadTable.")
public void testRegisterAndLoadTableWithReturnedETag() {
// TODO: Re-enable test once spec is up to date with ETag change for loadTable in Iceberg

Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));

Invocation registerInvocation =
catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/register")
.buildPost(
Entity.json(
Map.of("name", "my_etagged_table", "metadata-location", fileLocation)));
try (Response registerResponse = registerInvocation.invoke()) {
assertThat(registerResponse.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = registerResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation =
catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_etagged_table")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus())
.isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}

} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

@Test
@Disabled("Enable once ETag support is available in the API for loadTable.")
public void testCreateAndLoadTableWithReturnedEtag() {
// TODO: Re-enable test once spec is up to date with ETag change for loadTable in Iceberg

Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));

Invocation createInvocation =
catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables")
.buildPost(
Entity.json(
CreateTableRequest.builder()
.withName("my_etagged_table")
.withLocation(tableMetadata.location())
.withPartitionSpec(tableMetadata.spec())
.withSchema(tableMetadata.schema())
.withWriteOrder(tableMetadata.sortOrder())
.build()));
try (Response createResponse = createInvocation.invoke()) {
assertThat(createResponse.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = createResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation =
catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_etagged_table")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus())
.isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

@Test
public void testSendNotificationInternalCatalog() {
Map<String, String> payload =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
Expand Down Expand Up @@ -865,6 +866,35 @@ public void testLoadTableInsufficientPermissions() {
() -> newWrapper().loadTable(TABLE_NS1A_2, "all"));
}

@Test
public void testLoadTableIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() ->
newWrapper().loadTableIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() ->
newWrapper()
.loadTableIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
}

@Test
public void testLoadTableWithReadAccessDelegationSufficientPrivileges() {
doTestSufficientPrivileges(
Expand Down Expand Up @@ -920,6 +950,73 @@ public void testLoadTableWithWriteAccessDelegationInsufficientPermissions() {
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"));
}

@Test
public void testLoadTableWithReadAccessDelegationIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() ->
newWrapper()
.loadTableWithAccessDelegationIfStale(
TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableWithReadAccessDelegationIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() ->
newWrapper()
.loadTableWithAccessDelegationIfStale(
TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
}

@Test
public void testLoadTableWithWriteAccessDelegationIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
// TODO: Once we give different creds for read/write privilege, move this
// TABLE_READ_DATA into a special-case test; with only TABLE_READ_DATA we'd expet
// to receive a read-only credential.
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() ->
newWrapper()
.loadTableWithAccessDelegationIfStale(
TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableWithWriteAccessDelegationIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() ->
newWrapper()
.loadTableWithAccessDelegationIfStale(
TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
}

@Test
public void testUpdateTableSufficientPrivileges() {
doTestSufficientPrivileges(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.collect.ImmutableSet;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import java.net.URLEncoder;
Expand Down Expand Up @@ -70,6 +72,8 @@
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.http.IcebergHttpUtil;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.types.CommitTableRequest;
import org.apache.polaris.service.types.CommitViewRequest;
import org.apache.polaris.service.types.NotificationRequest;
Expand Down Expand Up @@ -307,9 +311,21 @@ public Response createTable(
.build();
}
} else if (delegationModes.isEmpty()) {
return Response.ok(catalog.createTableDirect(ns, createTableRequest)).build();
LoadTableResponse response = catalog.createTableDirect(ns, createTableRequest);
return Response.ok(response)
.header(
HttpHeaders.ETAG,
IcebergHttpUtil.generateETagForMetadataFileLocation(
response.metadataLocation()))
.build();
} else {
return Response.ok(catalog.createTableDirectWithWriteDelegation(ns, createTableRequest))
LoadTableResponse response =
catalog.createTableDirectWithWriteDelegation(ns, createTableRequest);
return Response.ok(response)
.header(
HttpHeaders.ETAG,
IcebergHttpUtil.generateETagForMetadataFileLocation(
response.metadataLocation()))
.build();
}
});
Expand Down Expand Up @@ -341,16 +357,38 @@ public Response loadTable(
parseAccessDelegationModes(accessDelegationMode);
Namespace ns = decodeNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table));

// TODO: Populate with header value from parameter once the generated interface
// contains the if-none-match header
IfNoneMatch ifNoneMatch = IfNoneMatch.fromHeader(null);

if (ifNoneMatch.isWildcard()) {
throw new BadRequestException("If-None-Match may not take the value of '*'");
}

return withCatalog(
securityContext,
prefix,
catalog -> {
LoadTableResponse response;

if (delegationModes.isEmpty()) {
return Response.ok(catalog.loadTable(tableIdentifier, snapshots)).build();
response =
catalog
.loadTableIfStale(tableIdentifier, ifNoneMatch, snapshots)
.orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED));
} else {
return Response.ok(catalog.loadTableWithAccessDelegation(tableIdentifier, snapshots))
.build();
response =
catalog
.loadTableWithAccessDelegationIfStale(tableIdentifier, ifNoneMatch, snapshots)
.orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED));
}

return Response.ok(response)
.header(
HttpHeaders.ETAG,
IcebergHttpUtil.generateETagForMetadataFileLocation(response.metadataLocation()))
.build();
});
}

Expand Down Expand Up @@ -406,7 +444,15 @@ public Response registerTable(
return withCatalog(
securityContext,
prefix,
catalog -> Response.ok(catalog.registerTable(ns, registerTableRequest)).build());
catalog -> {
LoadTableResponse response = catalog.registerTable(ns, registerTableRequest);

return Response.ok(response)
.header(
HttpHeaders.ETAG,
IcebergHttpUtil.generateETagForMetadataFileLocation(response.metadataLocation()))
.build();
});
}

@Override
Expand Down
Loading