diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java index 80845608da..7fd263e44f 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java @@ -19,8 +19,13 @@ package org.apache.polaris.service.it; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.aws.AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS; import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; import static org.assertj.core.api.Assertions.assertThat; @@ -42,7 +47,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -57,6 +61,7 @@ import org.apache.polaris.core.admin.model.PolarisCatalog; import org.apache.polaris.core.admin.model.PrincipalWithCredentials; import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.catalog.AccessDelegationMode; import org.apache.polaris.service.it.env.CatalogApi; import org.apache.polaris.service.it.env.ClientCredentials; import org.apache.polaris.service.it.env.ManagementApi; @@ -74,6 +79,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -112,7 +118,6 @@ public Map getConfigOverrides() { required(1, "id", Types.IntegerType.get(), "doc"), optional(2, "data", Types.StringType.get())); - private static ClientCredentials adminCredentials; private static PolarisApiEndpoints endpoints; private static PolarisClient client; private static ManagementApi managementApi; @@ -131,7 +136,6 @@ static void setup( @Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess, ClientCredentials credentials) { s3Client = minioAccess.s3Client(); - adminCredentials = credentials; endpoints = apiEndpoints; client = polarisClient(endpoints); adminToken = client.obtainToken(credentials); @@ -158,15 +162,19 @@ public void before(TestInfo testInfo) { } private RESTCatalog createCatalog( - Optional endpoint, Optional stsEndpoint, boolean pathStyleAccess) { - return createCatalog(endpoint, stsEndpoint, pathStyleAccess, Optional.empty()); + Optional endpoint, + Optional stsEndpoint, + boolean pathStyleAccess, + Optional delegationMode) { + return createCatalog(endpoint, stsEndpoint, pathStyleAccess, Optional.empty(), delegationMode); } private RESTCatalog createCatalog( Optional endpoint, Optional stsEndpoint, boolean pathStyleAccess, - Optional endpointInternal) { + Optional endpointInternal, + Optional delegationMode) { AwsStorageConfigInfo.Builder storageConfig = AwsStorageConfigInfo.builder() .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) @@ -198,8 +206,16 @@ private RESTCatalog createCatalog( org.apache.iceberg.CatalogProperties.URI, endpoints.catalogApiEndpoint().toString()) .put(OAuth2Properties.TOKEN, authToken) .put("warehouse", catalogName) - .putAll(endpoints.extraHeaders("header.")) - .put("header.X-Iceberg-Access-Delegation", "vended-credentials"); + .putAll(endpoints.extraHeaders("header.")); + + delegationMode.ifPresent( + dm -> propertiesBuilder.put("header.X-Iceberg-Access-Delegation", dm.protocolValue())); + + if (delegationMode.isEmpty()) { + // Use local credentials on the client side + propertiesBuilder.put("s3.access-key-id", MINIO_ACCESS_KEY); + propertiesBuilder.put("s3.secret-access-key", MINIO_SECRET_KEY); + } restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast()); return restCatalog; @@ -213,13 +229,34 @@ public void cleanUp() { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testCreateTable(boolean pathStyle) throws IOException { + LoadTableResponse response = doTestCreateTable(pathStyle, Optional.empty()); + assertThat(response.config()).doesNotContainKey(SECRET_ACCESS_KEY); + assertThat(response.config()).doesNotContainKey(ACCESS_KEY_ID); + assertThat(response.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT); + assertThat(response.credentials()).isEmpty(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCreateTableVendedCredentials(boolean pathStyle) throws IOException { + LoadTableResponse response = doTestCreateTable(pathStyle, Optional.of(VENDED_CREDENTIALS)); + assertThat(response.config()) + .containsEntry( + REFRESH_CREDENTIALS_ENDPOINT, + "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials"); + assertThat(response.credentials()).hasSize(1); + } + + private LoadTableResponse doTestCreateTable(boolean pathStyle, Optional dm) + throws IOException { try (RESTCatalog restCatalog = - createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle)) { - LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog); + createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle, dm)) { + LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, dm); if (pathStyle) { assertThat(loadTableResponse.config()) .containsEntry("s3.path-style-access", Boolean.TRUE.toString()); } + return loadTableResponse; } } @@ -230,7 +267,8 @@ public void testInternalEndpoints() throws IOException { Optional.of("http://s3.example.com"), Optional.of(endpoint), false, - Optional.of(endpoint))) { + Optional.of(endpoint), + Optional.empty())) { StorageConfigInfo storageConfig = managementApi.getCatalog(catalogName).getStorageConfigInfo(); assertThat((AwsStorageConfigInfo) storageConfig) @@ -240,12 +278,13 @@ public void testInternalEndpoints() throws IOException { AwsStorageConfigInfo::getEndpointInternal, AwsStorageConfigInfo::getPathStyleAccess) .containsExactly("http://s3.example.com", endpoint, endpoint, false); - LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog); - assertThat(loadTableResponse.config()).containsEntry("s3.endpoint", "http://s3.example.com"); + LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, Optional.empty()); + assertThat(loadTableResponse.config()).containsEntry(ENDPOINT, "http://s3.example.com"); } } - public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOException { + public LoadTableResponse doTestCreateTable( + RESTCatalog restCatalog, Optional dm) { catalogApi.createNamespace(catalogName, "test-ns"); TableIdentifier id = TableIdentifier.of("test-ns", "t1"); Table table = restCatalog.createTable(id, SCHEMA); @@ -266,12 +305,13 @@ public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOExc assertThat(response.contentLength()).isGreaterThan(0); LoadTableResponse loadTableResponse = - catalogApi.loadTableWithAccessDelegation(catalogName, id, "ALL"); - assertThat(loadTableResponse.config()) - .containsKey("s3.endpoint") - .containsEntry( - AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, - "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials"); + catalogApi.loadTable( + catalogName, + id, + "ALL", + dm.map(v -> Map.of("X-Iceberg-Access-Delegation", v.protocolValue())).orElse(Map.of())); + + assertThat(loadTableResponse.config()).containsKey(ENDPOINT); restCatalog.dropTable(id); assertThat(restCatalog.tableExists(id)).isFalse(); @@ -279,10 +319,18 @@ public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOExc } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAppendFiles(boolean pathStyle) throws IOException { + @CsvSource("true,") + @CsvSource("false,") + @CsvSource("true,VENDED_CREDENTIALS") + @CsvSource("false,VENDED_CREDENTIALS") + public void testAppendFiles(boolean pathStyle, AccessDelegationMode delegationMode) + throws IOException { try (RESTCatalog restCatalog = - createCatalog(Optional.of(endpoint), Optional.of(endpoint), pathStyle)) { + createCatalog( + Optional.of(endpoint), + Optional.of(endpoint), + pathStyle, + Optional.ofNullable(delegationMode))) { catalogApi.createNamespace(catalogName, "test-ns"); TableIdentifier id = TableIdentifier.of("test-ns", "t1"); Table table = restCatalog.createTable(id, SCHEMA); @@ -295,7 +343,8 @@ public void testAppendFiles(boolean pathStyle) throws IOException { URI.create( table .locationProvider() - .newDataLocation(String.format("test-file-%s.txt", pathStyle))); + .newDataLocation( + String.format("test-file-%s-%s.txt", pathStyle, delegationMode))); OutputFile f1 = io.newOutputFile(loc.toString()); try (PositionOutputStream os = f1.create()) { os.write("Hello World".getBytes(UTF_8)); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 2b7c85384b..d1daf8fa56 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -794,10 +794,6 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); - if (!delegationModes.contains(VENDED_CREDENTIALS)) { - return responseBuilder; - } - if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { LOGGER .atDebug() @@ -808,15 +804,15 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential credentialDelegation.getAccessConfig( tableIdentifier, tableMetadata, actions, refreshCredentialsEndpoint); Map credentialConfig = accessConfig.credentials(); - responseBuilder.addAllConfig(credentialConfig); - responseBuilder.addAllConfig(accessConfig.extraProperties()); - if (!credentialConfig.isEmpty()) { + if (!credentialConfig.isEmpty() && delegationModes.contains(VENDED_CREDENTIALS)) { + responseBuilder.addAllConfig(credentialConfig); responseBuilder.addCredential( ImmutableCredential.builder() .prefix(tableMetadata.location()) .config(credentialConfig) .build()); } + responseBuilder.addAllConfig(accessConfig.extraProperties()); } return responseBuilder; } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 89a27307d9..c6c03e799c 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -18,6 +18,9 @@ */ package org.apache.polaris.service; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; + import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.GoogleCredentials; import jakarta.annotation.Nonnull; @@ -77,6 +80,9 @@ import org.apache.polaris.service.task.TaskExecutor; import org.mockito.Mockito; import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Credentials; public record TestServices( Clock clock, @@ -129,10 +135,21 @@ public static class Builder { private PolarisDiagnostics diagnostics = new PolarisDefaultDiagServiceImpl(); private RealmContext realmContext = TEST_REALM; private Map config = Map.of(); - private StsClient stsClient = Mockito.mock(StsClient.class); + private StsClient stsClient; private FileIOFactorySupplier fileIOFactorySupplier = MeasuredFileIOFactory::new; - private Builder() {} + private Builder() { + stsClient = Mockito.mock(StsClient.class, RETURNS_DEEP_STUBS); + AssumeRoleResponse arr = Mockito.mock(AssumeRoleResponse.class, RETURNS_DEEP_STUBS); + Mockito.when(stsClient.assumeRole(any(AssumeRoleRequest.class))).thenReturn(arr); + Mockito.when(arr.credentials()) + .thenReturn( + Credentials.builder() + .accessKeyId("test-access-key-id-111") + .secretAccessKey("test-secret-access-key-222") + .sessionToken("test-session-token-333") + .build()); + } public Builder realmContext(RealmContext realmContext) { this.realmContext = realmContext; @@ -222,7 +239,7 @@ public TestServices build() { @SuppressWarnings("unchecked") Instance externalCatalogFactory = Mockito.mock(Instance.class); - Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory); + Mockito.when(externalCatalogFactory.select(any())).thenReturn(externalCatalogFactory); Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true); IcebergCatalogAdapter catalogService =