From 552c9687c02ff70b3c11585aea332298df1f0900 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Thu, 18 Sep 2025 17:08:53 -0400 Subject: [PATCH 1/3] Always propagate non-credential properties from AccessConfig to clients This change builds on top of #2589 and further prepares Polaris code to support non-STS S3 implementations for #2589. For S3 implementations that do have STS, this change enables clients to run with local credentials (no credential vending) and still receive endpoint configuration from the catalog. * Call `SupportsCredentialDelegation.getAccessConfig()` on all relevant create/load requests (previously it was called only when `vended-credentials` was requested * Always sent `AccessConfig.extraProperties()` to clients * Expose credentials to clients only when the `vended-credentials` access delegation mode is requested. * There is not client-visible behaviour change for implementations of `PolarisStorageIntegration` that do not produce "extra" `AccessConfig` properties. --- .../service/it/RestCatalogMinIOSpecialIT.java | 98 ++++++++++++++----- .../iceberg/IcebergCatalogHandler.java | 10 +- .../apache/polaris/service/TestServices.java | 23 ++++- 3 files changed, 94 insertions(+), 37 deletions(-) diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java index 80845608da..97bac5eeda 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java @@ -19,8 +19,13 @@ package org.apache.polaris.service.it; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.aws.AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS; import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; import static org.assertj.core.api.Assertions.assertThat; @@ -42,7 +47,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -57,6 +61,7 @@ import org.apache.polaris.core.admin.model.PolarisCatalog; import org.apache.polaris.core.admin.model.PrincipalWithCredentials; import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.catalog.AccessDelegationMode; import org.apache.polaris.service.it.env.CatalogApi; import org.apache.polaris.service.it.env.ClientCredentials; import org.apache.polaris.service.it.env.ManagementApi; @@ -74,7 +79,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -112,7 +117,6 @@ public Map getConfigOverrides() { required(1, "id", Types.IntegerType.get(), "doc"), optional(2, "data", Types.StringType.get())); - private static ClientCredentials adminCredentials; private static PolarisApiEndpoints endpoints; private static PolarisClient client; private static ManagementApi managementApi; @@ -131,7 +135,6 @@ static void setup( @Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess, ClientCredentials credentials) { s3Client = minioAccess.s3Client(); - adminCredentials = credentials; endpoints = apiEndpoints; client = polarisClient(endpoints); adminToken = client.obtainToken(credentials); @@ -158,15 +161,19 @@ public void before(TestInfo testInfo) { } private RESTCatalog createCatalog( - Optional endpoint, Optional stsEndpoint, boolean pathStyleAccess) { - return createCatalog(endpoint, stsEndpoint, pathStyleAccess, Optional.empty()); + Optional endpoint, + Optional stsEndpoint, + boolean pathStyleAccess, + Optional delegationMode) { + return createCatalog(endpoint, stsEndpoint, pathStyleAccess, Optional.empty(), delegationMode); } private RESTCatalog createCatalog( Optional endpoint, Optional stsEndpoint, boolean pathStyleAccess, - Optional endpointInternal) { + Optional endpointInternal, + Optional delegationMode) { AwsStorageConfigInfo.Builder storageConfig = AwsStorageConfigInfo.builder() .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) @@ -198,8 +205,16 @@ private RESTCatalog createCatalog( org.apache.iceberg.CatalogProperties.URI, endpoints.catalogApiEndpoint().toString()) .put(OAuth2Properties.TOKEN, authToken) .put("warehouse", catalogName) - .putAll(endpoints.extraHeaders("header.")) - .put("header.X-Iceberg-Access-Delegation", "vended-credentials"); + .putAll(endpoints.extraHeaders("header.")); + + delegationMode.ifPresent( + dm -> propertiesBuilder.put("header.X-Iceberg-Access-Delegation", dm.protocolValue())); + + if (delegationMode.isEmpty()) { + // Use local credentials on the client side + propertiesBuilder.put("s3.access-key-id", MINIO_ACCESS_KEY); + propertiesBuilder.put("s3.secret-access-key", MINIO_SECRET_KEY); + } restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast()); return restCatalog; @@ -211,11 +226,15 @@ public void cleanUp() { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testCreateTable(boolean pathStyle) throws IOException { + @CsvSource("true,") + @CsvSource("false,") + @CsvSource("true,VENDED_CREDENTIALS") + @CsvSource("false,VENDED_CREDENTIALS") + public void testCreateTable(boolean pathStyle, AccessDelegationMode dm) throws IOException { try (RESTCatalog restCatalog = - createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle)) { - LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog); + createCatalog( + Optional.of(endpoint), Optional.empty(), pathStyle, Optional.ofNullable(dm))) { + LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, Optional.ofNullable(dm)); if (pathStyle) { assertThat(loadTableResponse.config()) .containsEntry("s3.path-style-access", Boolean.TRUE.toString()); @@ -230,7 +249,8 @@ public void testInternalEndpoints() throws IOException { Optional.of("http://s3.example.com"), Optional.of(endpoint), false, - Optional.of(endpoint))) { + Optional.of(endpoint), + Optional.empty())) { StorageConfigInfo storageConfig = managementApi.getCatalog(catalogName).getStorageConfigInfo(); assertThat((AwsStorageConfigInfo) storageConfig) @@ -240,12 +260,13 @@ public void testInternalEndpoints() throws IOException { AwsStorageConfigInfo::getEndpointInternal, AwsStorageConfigInfo::getPathStyleAccess) .containsExactly("http://s3.example.com", endpoint, endpoint, false); - LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog); - assertThat(loadTableResponse.config()).containsEntry("s3.endpoint", "http://s3.example.com"); + LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, Optional.empty()); + assertThat(loadTableResponse.config()).containsEntry(ENDPOINT, "http://s3.example.com"); } } - public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOException { + public LoadTableResponse doTestCreateTable( + RESTCatalog restCatalog, Optional dm) { catalogApi.createNamespace(catalogName, "test-ns"); TableIdentifier id = TableIdentifier.of("test-ns", "t1"); Table table = restCatalog.createTable(id, SCHEMA); @@ -266,12 +287,26 @@ public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOExc assertThat(response.contentLength()).isGreaterThan(0); LoadTableResponse loadTableResponse = - catalogApi.loadTableWithAccessDelegation(catalogName, id, "ALL"); - assertThat(loadTableResponse.config()) - .containsKey("s3.endpoint") - .containsEntry( - AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, - "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials"); + catalogApi.loadTable( + catalogName, + id, + "ALL", + dm.map(v -> Map.of("X-Iceberg-Access-Delegation", v.protocolValue())).orElse(Map.of())); + + assertThat(loadTableResponse.config()).containsKey(ENDPOINT); + + if (dm.map(VENDED_CREDENTIALS::equals).orElse(false)) { + assertThat(loadTableResponse.config()) + .containsEntry( + REFRESH_CREDENTIALS_ENDPOINT, + "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials"); + assertThat(loadTableResponse.credentials()).hasSize(1); + } else { + assertThat(loadTableResponse.config()).doesNotContainKey(SECRET_ACCESS_KEY); + assertThat(loadTableResponse.config()).doesNotContainKey(ACCESS_KEY_ID); + assertThat(loadTableResponse.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT); + assertThat(loadTableResponse.credentials()).isEmpty(); + } restCatalog.dropTable(id); assertThat(restCatalog.tableExists(id)).isFalse(); @@ -279,10 +314,18 @@ public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOExc } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAppendFiles(boolean pathStyle) throws IOException { + // TODO: @CsvSource("true,") + // @CsvSource("false,") + @CsvSource("true,VENDED_CREDENTIALS") + @CsvSource("false,VENDED_CREDENTIALS") + public void testAppendFiles(boolean pathStyle, AccessDelegationMode delegationMode) + throws IOException { try (RESTCatalog restCatalog = - createCatalog(Optional.of(endpoint), Optional.of(endpoint), pathStyle)) { + createCatalog( + Optional.of(endpoint), + Optional.of(endpoint), + pathStyle, + Optional.ofNullable(delegationMode))) { catalogApi.createNamespace(catalogName, "test-ns"); TableIdentifier id = TableIdentifier.of("test-ns", "t1"); Table table = restCatalog.createTable(id, SCHEMA); @@ -295,7 +338,8 @@ public void testAppendFiles(boolean pathStyle) throws IOException { URI.create( table .locationProvider() - .newDataLocation(String.format("test-file-%s.txt", pathStyle))); + .newDataLocation( + String.format("test-file-%s-%s.txt", pathStyle, delegationMode))); OutputFile f1 = io.newOutputFile(loc.toString()); try (PositionOutputStream os = f1.create()) { os.write("Hello World".getBytes(UTF_8)); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 2b7c85384b..d1daf8fa56 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -794,10 +794,6 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); - if (!delegationModes.contains(VENDED_CREDENTIALS)) { - return responseBuilder; - } - if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { LOGGER .atDebug() @@ -808,15 +804,15 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential credentialDelegation.getAccessConfig( tableIdentifier, tableMetadata, actions, refreshCredentialsEndpoint); Map credentialConfig = accessConfig.credentials(); - responseBuilder.addAllConfig(credentialConfig); - responseBuilder.addAllConfig(accessConfig.extraProperties()); - if (!credentialConfig.isEmpty()) { + if (!credentialConfig.isEmpty() && delegationModes.contains(VENDED_CREDENTIALS)) { + responseBuilder.addAllConfig(credentialConfig); responseBuilder.addCredential( ImmutableCredential.builder() .prefix(tableMetadata.location()) .config(credentialConfig) .build()); } + responseBuilder.addAllConfig(accessConfig.extraProperties()); } return responseBuilder; } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 89a27307d9..c6c03e799c 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -18,6 +18,9 @@ */ package org.apache.polaris.service; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; + import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.GoogleCredentials; import jakarta.annotation.Nonnull; @@ -77,6 +80,9 @@ import org.apache.polaris.service.task.TaskExecutor; import org.mockito.Mockito; import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Credentials; public record TestServices( Clock clock, @@ -129,10 +135,21 @@ public static class Builder { private PolarisDiagnostics diagnostics = new PolarisDefaultDiagServiceImpl(); private RealmContext realmContext = TEST_REALM; private Map config = Map.of(); - private StsClient stsClient = Mockito.mock(StsClient.class); + private StsClient stsClient; private FileIOFactorySupplier fileIOFactorySupplier = MeasuredFileIOFactory::new; - private Builder() {} + private Builder() { + stsClient = Mockito.mock(StsClient.class, RETURNS_DEEP_STUBS); + AssumeRoleResponse arr = Mockito.mock(AssumeRoleResponse.class, RETURNS_DEEP_STUBS); + Mockito.when(stsClient.assumeRole(any(AssumeRoleRequest.class))).thenReturn(arr); + Mockito.when(arr.credentials()) + .thenReturn( + Credentials.builder() + .accessKeyId("test-access-key-id-111") + .secretAccessKey("test-secret-access-key-222") + .sessionToken("test-session-token-333") + .build()); + } public Builder realmContext(RealmContext realmContext) { this.realmContext = realmContext; @@ -222,7 +239,7 @@ public TestServices build() { @SuppressWarnings("unchecked") Instance externalCatalogFactory = Mockito.mock(Instance.class); - Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory); + Mockito.when(externalCatalogFactory.select(any())).thenReturn(externalCatalogFactory); Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true); IcebergCatalogAdapter catalogService = From 7d14c14259acb77773634c50fb17cd8524505219 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Fri, 19 Sep 2025 13:53:18 -0400 Subject: [PATCH 2/3] review: uncomment more test cases --- .../apache/polaris/service/it/RestCatalogMinIOSpecialIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java index 97bac5eeda..355dbbeb1f 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java @@ -314,8 +314,8 @@ public LoadTableResponse doTestCreateTable( } @ParameterizedTest - // TODO: @CsvSource("true,") - // @CsvSource("false,") + @CsvSource("true,") + @CsvSource("false,") @CsvSource("true,VENDED_CREDENTIALS") @CsvSource("false,VENDED_CREDENTIALS") public void testAppendFiles(boolean pathStyle, AccessDelegationMode delegationMode) From a30156c219b2d21b99db070b053c6b5a031a4539 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Fri, 19 Sep 2025 18:29:11 -0400 Subject: [PATCH 3/3] review: move asserts to top-level test methods --- .../service/it/RestCatalogMinIOSpecialIT.java | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java index 355dbbeb1f..7fd263e44f 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java @@ -80,6 +80,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -226,19 +227,36 @@ public void cleanUp() { } @ParameterizedTest - @CsvSource("true,") - @CsvSource("false,") - @CsvSource("true,VENDED_CREDENTIALS") - @CsvSource("false,VENDED_CREDENTIALS") - public void testCreateTable(boolean pathStyle, AccessDelegationMode dm) throws IOException { + @ValueSource(booleans = {true, false}) + public void testCreateTable(boolean pathStyle) throws IOException { + LoadTableResponse response = doTestCreateTable(pathStyle, Optional.empty()); + assertThat(response.config()).doesNotContainKey(SECRET_ACCESS_KEY); + assertThat(response.config()).doesNotContainKey(ACCESS_KEY_ID); + assertThat(response.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT); + assertThat(response.credentials()).isEmpty(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCreateTableVendedCredentials(boolean pathStyle) throws IOException { + LoadTableResponse response = doTestCreateTable(pathStyle, Optional.of(VENDED_CREDENTIALS)); + assertThat(response.config()) + .containsEntry( + REFRESH_CREDENTIALS_ENDPOINT, + "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials"); + assertThat(response.credentials()).hasSize(1); + } + + private LoadTableResponse doTestCreateTable(boolean pathStyle, Optional dm) + throws IOException { try (RESTCatalog restCatalog = - createCatalog( - Optional.of(endpoint), Optional.empty(), pathStyle, Optional.ofNullable(dm))) { - LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, Optional.ofNullable(dm)); + createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle, dm)) { + LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, dm); if (pathStyle) { assertThat(loadTableResponse.config()) .containsEntry("s3.path-style-access", Boolean.TRUE.toString()); } + return loadTableResponse; } } @@ -295,19 +313,6 @@ public LoadTableResponse doTestCreateTable( assertThat(loadTableResponse.config()).containsKey(ENDPOINT); - if (dm.map(VENDED_CREDENTIALS::equals).orElse(false)) { - assertThat(loadTableResponse.config()) - .containsEntry( - REFRESH_CREDENTIALS_ENDPOINT, - "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials"); - assertThat(loadTableResponse.credentials()).hasSize(1); - } else { - assertThat(loadTableResponse.config()).doesNotContainKey(SECRET_ACCESS_KEY); - assertThat(loadTableResponse.config()).doesNotContainKey(ACCESS_KEY_ID); - assertThat(loadTableResponse.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT); - assertThat(loadTableResponse.credentials()).isEmpty(); - } - restCatalog.dropTable(id); assertThat(restCatalog.tableExists(id)).isFalse(); return loadTableResponse;