From f61681209a2f5cbb41c699feaf433857a6389e3c Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Thu, 19 Jun 2025 19:43:33 -0400 Subject: [PATCH 01/11] Add STS clients pool No functional change. Introduce a dedicated interface for StsClient suppliers and implement it using a pool of cached clients. All client are "thin" and share the same `SdkHttpClient`. The latter is closed when the server shuts down. This is a step towards supporting non-AWS S3 storage (#1530). For this reason the STS endpoint is present in new interfaces, but is not used yet. --- .../aws/AwsCredentialsStorageIntegration.java | 20 +++- .../core/storage/aws/StsClientSupplier.java | 45 ++++++++ runtime/service/build.gradle.kts | 3 + .../quarkus/config/QuarkusProducers.java | 33 ++++++ .../storage/QuarkusStorageConfiguration.java | 3 +- .../service/storage/aws/S3AccessConfig.java | 85 +++++++++++++++ .../service/storage/aws/StsClientsPool.java | 101 ++++++++++++++++++ .../quarkus/admin/PolarisAuthzTestBase.java | 4 +- ...PolarisStorageIntegrationProviderImpl.java | 13 +-- .../apache/polaris/service/TestServices.java | 2 +- 10 files changed, 295 insertions(+), 14 deletions(-) create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientSupplier.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index 04c1970dd9..cd83b51b7e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -32,6 +32,7 @@ import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.StorageUtil; +import org.apache.polaris.core.storage.aws.StsClientSupplier.StsDestination; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.policybuilder.iam.IamConditionOperator; import software.amazon.awssdk.policybuilder.iam.IamEffect; @@ -45,17 +46,21 @@ /** Credential vendor that supports generating */ public class AwsCredentialsStorageIntegration extends InMemoryStorageIntegration { - private final StsClient stsClient; + private final StsClientSupplier stsClientSupplier; private final Optional credentialsProvider; - public AwsCredentialsStorageIntegration(StsClient stsClient) { - this(stsClient, Optional.empty()); + public AwsCredentialsStorageIntegration(StsClient fixedClient) { + this((destination) -> fixedClient); + } + + public AwsCredentialsStorageIntegration(StsClientSupplier stsClientSupplier) { + this(stsClientSupplier, Optional.empty()); } public AwsCredentialsStorageIntegration( - StsClient stsClient, Optional credentialsProvider) { + StsClientSupplier stsClientSupplier, Optional credentialsProvider) { super(AwsCredentialsStorageIntegration.class.getName()); - this.stsClient = stsClient; + this.stsClientSupplier = stsClientSupplier; this.credentialsProvider = credentialsProvider; } @@ -87,6 +92,11 @@ public EnumMap getSubscopedCreds( .durationSeconds(storageCredentialDurationSeconds); credentialsProvider.ifPresent( cp -> request.overrideConfiguration(b -> b.credentialsProvider(cp))); + + @SuppressWarnings("resource") + StsClient stsClient = + stsClientSupplier.stsClient(StsDestination.of(null, storageConfig.getRegion())); + AssumeRoleResponse response = stsClient.assumeRole(request.build()); EnumMap credentialMap = new EnumMap<>(StorageAccessProperty.class); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientSupplier.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientSupplier.java new file mode 100644 index 0000000000..7c25dfb7ae --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientSupplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.storage.aws; + +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Optional; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; +import software.amazon.awssdk.services.sts.StsClient; + +public interface StsClientSupplier { + + StsClient stsClient(StsDestination destination); + + @PolarisImmutable + interface StsDestination { + @Value.Parameter(order = 1) + Optional endpoint(); + + @Value.Parameter(order = 2) + Optional region(); + + static StsDestination of(@Nullable URI endpoint, @Nullable String region) { + return ImmutableStsDestination.of(Optional.ofNullable(endpoint), Optional.ofNullable(region)); + } + } +} diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index f7ae7dba90..3eeea0fb6e 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -79,6 +79,9 @@ dependencies { implementation("software.amazon.awssdk:sts") implementation("software.amazon.awssdk:iam-policy-builder") implementation("software.amazon.awssdk:s3") + implementation("software.amazon.awssdk:apache-client") { + exclude("commons-logging", "commons-logging") + } implementation(platform(libs.azuresdk.bom)) implementation("com.azure:azure-core") diff --git a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index e1b197005a..7bd3a6ea23 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.quarkus.config; +import io.micrometer.core.instrument.MeterRegistry; import io.smallrye.common.annotation.Identifier; import io.smallrye.context.SmallRyeManagedExecutor; import jakarta.enterprise.context.ApplicationScoped; @@ -75,12 +76,16 @@ import org.apache.polaris.service.quarkus.secrets.QuarkusSecretsManagerConfiguration; import org.apache.polaris.service.ratelimiter.RateLimiter; import org.apache.polaris.service.ratelimiter.TokenBucketFactory; +import org.apache.polaris.service.storage.aws.S3AccessConfig; +import org.apache.polaris.service.storage.aws.StsClientsPool; import org.apache.polaris.service.task.TaskHandlerConfiguration; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.context.ManagedExecutor; import org.eclipse.microprofile.context.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; public class QuarkusProducers { private static final Logger LOGGER = LoggerFactory.getLogger(QuarkusProducers.class); @@ -170,6 +175,34 @@ public UserSecretsManagerFactory userSecretsManagerFactory( return userSecretsManagerFactories.select(Identifier.Literal.of(config.type())).get(); } + @Produces + @Singleton + @Identifier("http-client-s3") + public SdkHttpClient sdkHttpClient(S3AccessConfig config) { + ApacheHttpClient.Builder httpClient = ApacheHttpClient.builder(); + config.maxHttpConnections().ifPresent(httpClient::maxConnections); + config.readTimeout().ifPresent(httpClient::socketTimeout); + config.connectTimeout().ifPresent(httpClient::connectionTimeout); + config.connectionAcquisitionTimeout().ifPresent(httpClient::connectionAcquisitionTimeout); + config.connectionMaxIdleTime().ifPresent(httpClient::connectionMaxIdleTime); + config.connectionTimeToLive().ifPresent(httpClient::connectionTimeToLive); + config.expectContinueEnabled().ifPresent(httpClient::expectContinueEnabled); + return httpClient.build(); + } + + public void closeSdkHttpClient(@Disposes @Identifier("http-client-s3") SdkHttpClient client) { + client.close(); + } + + @Produces + @ApplicationScoped + public StsClientsPool stsClientsPool( + @Identifier("http-client-s3") SdkHttpClient httpClient, + S3AccessConfig config, + MeterRegistry meterRegistry) { + return new StsClientsPool(config, httpClient, meterRegistry); + } + /** * Eagerly initialize the in-memory default realm on startup, so that users can check the * credentials printed to stdout immediately. diff --git a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/storage/QuarkusStorageConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/storage/QuarkusStorageConfiguration.java index e18b2ed198..e06ea08a1c 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/storage/QuarkusStorageConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/storage/QuarkusStorageConfiguration.java @@ -23,9 +23,10 @@ import java.time.Duration; import java.util.Optional; import org.apache.polaris.service.storage.StorageConfiguration; +import org.apache.polaris.service.storage.aws.S3AccessConfig; @ConfigMapping(prefix = "polaris.storage") -public interface QuarkusStorageConfiguration extends StorageConfiguration { +public interface QuarkusStorageConfiguration extends StorageConfiguration, S3AccessConfig { @WithName("aws.access-key") @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java new file mode 100644 index 0000000000..b53bd8a15c --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.storage.aws; + +import java.time.Duration; +import java.util.Optional; +import java.util.OptionalInt; + +public interface S3AccessConfig { + /** Default value for {@link #sessionCacheMaxSize()}. */ + int DEFAULT_MAX_SESSION_CREDENTIAL_CACHE_ENTRIES = 1000; + + /** Default value for {@link #clientsCacheMaxSize()}. */ + int DEFAULT_MAX_STS_CLIENT_CACHE_ENTRIES = 50; + + /** Default value for {@link #sessionGracePeriod()}. */ + Duration DEFAULT_SESSION_REFRESH_GRACE_PERIOD = Duration.ofMinutes(5); + + /** + * The time period to subtract from the S3 session credentials (assumed role credentials) expiry + * time to define the time when those credentials become eligible for refreshing. + */ + Optional sessionGracePeriod(); + + default Duration effectiveSessionGracePeriod() { + return sessionGracePeriod().orElse(DEFAULT_SESSION_REFRESH_GRACE_PERIOD); + } + + /** + * Maximum number of entries to keep in the session credentials cache (assumed role credentials). + */ + OptionalInt sessionCacheMaxSize(); + + default int effectiveSessionCacheMaxSize() { + return sessionCacheMaxSize().orElse(DEFAULT_MAX_SESSION_CREDENTIAL_CACHE_ENTRIES); + } + + /** Maximum number of entries to keep in the STS clients cache. */ + OptionalInt clientsCacheMaxSize(); + + default int effectiveClientsCacheMaxSize() { + return clientsCacheMaxSize().orElse(DEFAULT_MAX_STS_CLIENT_CACHE_ENTRIES); + } + + /** Override the default maximum number of pooled connections. */ + OptionalInt maxHttpConnections(); + + /** Override the default connection read timeout. */ + Optional readTimeout(); + + /** Override the default TCP connect timeout. */ + Optional connectTimeout(); + + /** + * Override default connection acquisition timeout. This is the time a request will wait for a + * connection from the pool. + */ + Optional connectionAcquisitionTimeout(); + + /** Override default max idle time of a pooled connection. */ + Optional connectionMaxIdleTime(); + + /** Override default time-time of a pooled connection. */ + Optional connectionTimeToLive(); + + /** Override default behavior whether to expect an HTTP/100-Continue. */ + Optional expectContinueEnabled(); +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java new file mode 100644 index 0000000000..814cda5ffc --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.storage.aws; + +import static java.util.Collections.singletonList; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.stats.StatsCounter; +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.polaris.core.storage.aws.StsClientSupplier; +import software.amazon.awssdk.endpoints.Endpoint; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.StsClientBuilder; + +/** Maintains a pool of STS clients. */ +public class StsClientsPool implements StsClientSupplier { + // CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2 + + private static final String CACHE_NAME = "sts-clients"; + + private final Cache clients; + private final Function clientBuilder; + + public StsClientsPool( + S3AccessConfig effectiveSts, SdkHttpClient sdkHttpClient, MeterRegistry meterRegistry) { + this( + effectiveSts.effectiveClientsCacheMaxSize(), + key -> defaultStsClient(key, sdkHttpClient), + Optional.ofNullable(meterRegistry)); + } + + @VisibleForTesting + StsClientsPool( + int maxSize, + Function clientBuilder, + Optional meterRegistry) { + this.clientBuilder = clientBuilder; + this.clients = + Caffeine.newBuilder() + .maximumSize(maxSize) + .recordStats(() -> statsCounter(meterRegistry, maxSize)) + .build(); + } + + @Override + public StsClient stsClient(StsDestination destination) { + return clients.get(destination, clientBuilder); + } + + private static StsClient defaultStsClient(StsDestination parameters, SdkHttpClient sdkClient) { + StsClientBuilder builder = StsClient.builder(); + builder.httpClient(sdkClient); + if (parameters.endpoint().isPresent()) { + CompletableFuture endpointFuture = + completedFuture(Endpoint.builder().url(parameters.endpoint().get()).build()); + builder.endpointProvider(params -> endpointFuture); + } + + parameters.region().ifPresent(r -> builder.region(Region.of(r))); + + return builder.build(); + } + + static StatsCounter statsCounter(Optional meterRegistry, int maxSize) { + if (meterRegistry.isPresent()) { + meterRegistry + .get() + .gauge("max_entries", singletonList(Tag.of("cache", CACHE_NAME)), "", x -> maxSize); + + return new CaffeineStatsCounter(meterRegistry.get(), CACHE_NAME); + } + return StatsCounter.disabledStatsCounter(); + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index 1c232bf54c..f568e54b71 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -98,6 +98,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import org.mockito.Mockito; +import software.amazon.awssdk.services.sts.StsClient; /** Base class for shared test setup logic used by various Polaris authz-related tests. */ public abstract class PolarisAuthzTestBase { @@ -218,9 +219,10 @@ public Map getConfigOverrides() { @BeforeAll public static void setUpMocks() { + StsClient stsClient = Mockito.mock(StsClient.class); PolarisStorageIntegrationProviderImpl mock = new PolarisStorageIntegrationProviderImpl( - Mockito::mock, + destination -> stsClient, Optional.empty(), () -> GoogleCredentials.create(new AccessToken("abc", new Date()))); QuarkusMock.installMockForType(mock, PolarisStorageIntegrationProviderImpl.class); diff --git a/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java b/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java index 253ab5d03a..eddf896feb 100644 --- a/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java +++ b/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java @@ -38,28 +38,29 @@ import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration; +import org.apache.polaris.core.storage.aws.StsClientSupplier; import org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration; import org.apache.polaris.core.storage.gcp.GcpCredentialsStorageIntegration; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.services.sts.StsClient; @ApplicationScoped public class PolarisStorageIntegrationProviderImpl implements PolarisStorageIntegrationProvider { - private final Supplier stsClientSupplier; + private final StsClientSupplier stsClientSupplier; private final Optional stsCredentials; private final Supplier gcpCredsProvider; @Inject - public PolarisStorageIntegrationProviderImpl(StorageConfiguration storageConfiguration) { + public PolarisStorageIntegrationProviderImpl( + StorageConfiguration storageConfiguration, StsClientSupplier stsClientSupplier) { this( - storageConfiguration.stsClientSupplier(false), + stsClientSupplier, Optional.ofNullable(storageConfiguration.stsCredentials()), storageConfiguration.gcpCredentialsSupplier()); } public PolarisStorageIntegrationProviderImpl( - Supplier stsClientSupplier, + StsClientSupplier stsClientSupplier, Optional stsCredentials, Supplier gcpCredsProvider) { this.stsClientSupplier = stsClientSupplier; @@ -80,7 +81,7 @@ public PolarisStorageIntegrationProviderImpl( case S3: storageIntegration = (PolarisStorageIntegration) - new AwsCredentialsStorageIntegration(stsClientSupplier.get(), stsCredentials); + new AwsCredentialsStorageIntegration(stsClientSupplier, stsCredentials); break; case GCS: storageIntegration = diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 48f4d713b9..555a0da15b 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -150,7 +150,7 @@ public TestServices build() { // Application level PolarisStorageIntegrationProviderImpl storageIntegrationProvider = new PolarisStorageIntegrationProviderImpl( - () -> stsClient, + (destination) -> stsClient, Optional.empty(), () -> GoogleCredentials.create(new AccessToken(GCP_ACCESS_TOKEN, new Date()))); InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory = From 4534bd1e04c073a43855cfdf8ed51623c61d75d4 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Fri, 20 Jun 2025 12:02:16 -0400 Subject: [PATCH 02/11] fix LICENSE --- LICENSE | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE b/LICENSE index 4330ca1544..bccb57e967 100644 --- a/LICENSE +++ b/LICENSE @@ -310,6 +310,7 @@ This product includes code from Project Nessie. * tools/config-docs/generator/src/test/java/tests/smallrye/VeryNested.java * tools/container-spec-helper/src/main/java/org/apache/polaris/containerspec/ContainerSpecHelper.java * runtime/admin/src/main/java/org/apache/polaris/admintool/PolarisAdminTool.java +* runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java * helm/polaris/tests/logging_storage_test.yaml * helm/polaris/tests/quantity_test.yaml * helm/polaris/tests/service_monitor_test.yaml From aaf66e2ce848dfb5c166fc82e08bce1cd79b4622 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Fri, 20 Jun 2025 12:40:56 -0400 Subject: [PATCH 03/11] review: rename to StsClientProvider --- .../aws/AwsCredentialsStorageIntegration.java | 14 +++++++------- ...lientSupplier.java => StsClientProvider.java} | 16 +++++++++++++++- .../service/storage/aws/StsClientsPool.java | 4 ++-- .../PolarisStorageIntegrationProviderImpl.java | 14 +++++++------- 4 files changed, 31 insertions(+), 17 deletions(-) rename polaris-core/src/main/java/org/apache/polaris/core/storage/aws/{StsClientSupplier.java => StsClientProvider.java} (62%) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index cd83b51b7e..f5d88c9341 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -32,7 +32,7 @@ import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.StorageUtil; -import org.apache.polaris.core.storage.aws.StsClientSupplier.StsDestination; +import org.apache.polaris.core.storage.aws.StsClientProvider.StsDestination; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.policybuilder.iam.IamConditionOperator; import software.amazon.awssdk.policybuilder.iam.IamEffect; @@ -46,21 +46,21 @@ /** Credential vendor that supports generating */ public class AwsCredentialsStorageIntegration extends InMemoryStorageIntegration { - private final StsClientSupplier stsClientSupplier; + private final StsClientProvider stsClientProvider; private final Optional credentialsProvider; public AwsCredentialsStorageIntegration(StsClient fixedClient) { this((destination) -> fixedClient); } - public AwsCredentialsStorageIntegration(StsClientSupplier stsClientSupplier) { - this(stsClientSupplier, Optional.empty()); + public AwsCredentialsStorageIntegration(StsClientProvider stsClientProvider) { + this(stsClientProvider, Optional.empty()); } public AwsCredentialsStorageIntegration( - StsClientSupplier stsClientSupplier, Optional credentialsProvider) { + StsClientProvider stsClientProvider, Optional credentialsProvider) { super(AwsCredentialsStorageIntegration.class.getName()); - this.stsClientSupplier = stsClientSupplier; + this.stsClientProvider = stsClientProvider; this.credentialsProvider = credentialsProvider; } @@ -95,7 +95,7 @@ public EnumMap getSubscopedCreds( @SuppressWarnings("resource") StsClient stsClient = - stsClientSupplier.stsClient(StsDestination.of(null, storageConfig.getRegion())); + stsClientProvider.stsClient(StsDestination.of(null, storageConfig.getRegion())); AssumeRoleResponse response = stsClient.assumeRole(request.build()); EnumMap credentialMap = diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientSupplier.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientProvider.java similarity index 62% rename from polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientSupplier.java rename to polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientProvider.java index 7c25dfb7ae..b5a1fefe14 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientSupplier.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/StsClientProvider.java @@ -24,17 +24,31 @@ import java.util.Optional; import org.apache.polaris.immutables.PolarisImmutable; import org.immutables.value.Value; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsBaseClientBuilder; import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.endpoints.StsEndpointProvider; -public interface StsClientSupplier { +public interface StsClientProvider { + /** + * Returns an STS client for the given destination (endpoint + region). The returned client may + * not be a fresh instance for every call, however the client is reusable for multiple concurrent + * requests from multiple threads. If the endpoint or region parameters are not specified, AWS SDK + * defaults will be used. + * + * @param destination Endpoint and Region data for the client. Both values are optional. + */ StsClient stsClient(StsDestination destination); @PolarisImmutable interface StsDestination { + /** Corresponds to {@link StsBaseClientBuilder#endpointProvider(StsEndpointProvider)} */ @Value.Parameter(order = 1) Optional endpoint(); + /** Corresponds to {@link AwsClientBuilder#region(Region)} */ @Value.Parameter(order = 2) Optional region(); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java index 814cda5ffc..b208f3b4ec 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java @@ -32,7 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import org.apache.polaris.core.storage.aws.StsClientSupplier; +import org.apache.polaris.core.storage.aws.StsClientProvider; import software.amazon.awssdk.endpoints.Endpoint; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.regions.Region; @@ -40,7 +40,7 @@ import software.amazon.awssdk.services.sts.StsClientBuilder; /** Maintains a pool of STS clients. */ -public class StsClientsPool implements StsClientSupplier { +public class StsClientsPool implements StsClientProvider { // CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2 private static final String CACHE_NAME = "sts-clients"; diff --git a/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java b/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java index eddf896feb..758da28745 100644 --- a/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java +++ b/service/common/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java @@ -38,7 +38,7 @@ import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration; -import org.apache.polaris.core.storage.aws.StsClientSupplier; +import org.apache.polaris.core.storage.aws.StsClientProvider; import org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration; import org.apache.polaris.core.storage.gcp.GcpCredentialsStorageIntegration; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -46,24 +46,24 @@ @ApplicationScoped public class PolarisStorageIntegrationProviderImpl implements PolarisStorageIntegrationProvider { - private final StsClientSupplier stsClientSupplier; + private final StsClientProvider stsClientProvider; private final Optional stsCredentials; private final Supplier gcpCredsProvider; @Inject public PolarisStorageIntegrationProviderImpl( - StorageConfiguration storageConfiguration, StsClientSupplier stsClientSupplier) { + StorageConfiguration storageConfiguration, StsClientProvider stsClientProvider) { this( - stsClientSupplier, + stsClientProvider, Optional.ofNullable(storageConfiguration.stsCredentials()), storageConfiguration.gcpCredentialsSupplier()); } public PolarisStorageIntegrationProviderImpl( - StsClientSupplier stsClientSupplier, + StsClientProvider stsClientProvider, Optional stsCredentials, Supplier gcpCredsProvider) { - this.stsClientSupplier = stsClientSupplier; + this.stsClientProvider = stsClientProvider; this.stsCredentials = stsCredentials; this.gcpCredsProvider = gcpCredsProvider; } @@ -81,7 +81,7 @@ public PolarisStorageIntegrationProviderImpl( case S3: storageIntegration = (PolarisStorageIntegration) - new AwsCredentialsStorageIntegration(stsClientSupplier, stsCredentials); + new AwsCredentialsStorageIntegration(stsClientProvider, stsCredentials); break; case GCS: storageIntegration = From 130327a854f662062918d8a9a41f4553a56d8736 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Fri, 20 Jun 2025 12:47:21 -0400 Subject: [PATCH 04/11] review: javadoc / cleanup of S3AccessConfig --- .../service/storage/aws/S3AccessConfig.java | 32 ++++--------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java index b53bd8a15c..940d03229a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/S3AccessConfig.java @@ -23,35 +23,17 @@ import java.util.Optional; import java.util.OptionalInt; +/** + * Configuration interface containing parameters for clients accessing S3 services from Polaris + * servers. + * + *

Currently, this configuration does not apply to all of Polaris code, but only to select + * services. + */ public interface S3AccessConfig { - /** Default value for {@link #sessionCacheMaxSize()}. */ - int DEFAULT_MAX_SESSION_CREDENTIAL_CACHE_ENTRIES = 1000; - /** Default value for {@link #clientsCacheMaxSize()}. */ int DEFAULT_MAX_STS_CLIENT_CACHE_ENTRIES = 50; - /** Default value for {@link #sessionGracePeriod()}. */ - Duration DEFAULT_SESSION_REFRESH_GRACE_PERIOD = Duration.ofMinutes(5); - - /** - * The time period to subtract from the S3 session credentials (assumed role credentials) expiry - * time to define the time when those credentials become eligible for refreshing. - */ - Optional sessionGracePeriod(); - - default Duration effectiveSessionGracePeriod() { - return sessionGracePeriod().orElse(DEFAULT_SESSION_REFRESH_GRACE_PERIOD); - } - - /** - * Maximum number of entries to keep in the session credentials cache (assumed role credentials). - */ - OptionalInt sessionCacheMaxSize(); - - default int effectiveSessionCacheMaxSize() { - return sessionCacheMaxSize().orElse(DEFAULT_MAX_SESSION_CREDENTIAL_CACHE_ENTRIES); - } - /** Maximum number of entries to keep in the STS clients cache. */ OptionalInt clientsCacheMaxSize(); From eea1901914b3af7c43c3ba28ae06399d12cc8ddb Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Wed, 25 Jun 2025 00:03:38 -0400 Subject: [PATCH 05/11] support customizing S3 / STS endpoints --- LICENSE | 4 + bom/build.gradle.kts | 1 + gradle/projects.main.properties | 1 + .../polaris/core/entity/CatalogEntity.java | 3 +- .../aws/AwsCredentialsStorageIntegration.java | 7 +- .../aws/AwsStorageConfigurationInfo.java | 26 +- runtime/service/build.gradle.kts | 2 + .../quarkus/it/QuarkusRestCatalogMinIoIT.java | 248 ++++++++++++++++ spec/polaris-management-service.yml | 4 + tools/minio-testcontainer/build.gradle.kts | 38 +++ .../org/apache/polaris/test/minio/Minio.java | 43 +++ .../polaris/test/minio/MinioAccess.java | 65 +++++ .../polaris/test/minio/MinioContainer.java | 274 ++++++++++++++++++ .../polaris/test/minio/MinioExtension.java | 140 +++++++++ .../test/minio/Dockerfile-minio-version | 3 + 15 files changed, 851 insertions(+), 8 deletions(-) create mode 100644 runtime/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java create mode 100644 tools/minio-testcontainer/build.gradle.kts create mode 100644 tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/Minio.java create mode 100644 tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java create mode 100644 tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java create mode 100644 tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java create mode 100644 tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version diff --git a/LICENSE b/LICENSE index bccb57e967..be1be2d17c 100644 --- a/LICENSE +++ b/LICENSE @@ -309,6 +309,10 @@ This product includes code from Project Nessie. * tools/config-docs/generator/src/test/java/tests/smallrye/SomeEnum.java * tools/config-docs/generator/src/test/java/tests/smallrye/VeryNested.java * tools/container-spec-helper/src/main/java/org/apache/polaris/containerspec/ContainerSpecHelper.java +* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/Minio.java +* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java +* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java +* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java * runtime/admin/src/main/java/org/apache/polaris/admintool/PolarisAdminTool.java * runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java * helm/polaris/tests/logging_storage_test.yaml diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 64b2319350..aef82b2edf 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { api(project(":polaris-api-management-service")) api(project(":polaris-container-spec-helper")) + api(project(":polaris-minio-testcontainer")) api(project(":polaris-immutables")) api(project(":polaris-misc-types")) api(project(":polaris-version")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index fecea71f8c..9c6aa8b1c9 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -39,6 +39,7 @@ polaris-tests=integration-tests aggregated-license-report=aggregated-license-report polaris-immutables=tools/immutables polaris-container-spec-helper=tools/container-spec-helper +polaris-minio-testcontainer=tools/minio-testcontainer polaris-version=tools/version polaris-misc-types=tools/misc-types polaris-persistence-varint=nosql/persistence/varint diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java index 99d8557f0b..f1326e4c4a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java @@ -273,7 +273,8 @@ public Builder setStorageConfigurationInfo( new ArrayList<>(allowedLocations), awsConfigModel.getRoleArn(), awsConfigModel.getExternalId(), - awsConfigModel.getRegion()); + awsConfigModel.getRegion(), + awsConfigModel.getEndpoint()); awsConfig.validateArn(awsConfigModel.getRoleArn()); config = awsConfig; break; diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index f5d88c9341..03edd61300 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -93,9 +93,10 @@ public EnumMap getSubscopedCreds( credentialsProvider.ifPresent( cp -> request.overrideConfiguration(b -> b.credentialsProvider(cp))); + URI endpointUri = storageConfig.getEndpointUri(); @SuppressWarnings("resource") StsClient stsClient = - stsClientProvider.stsClient(StsDestination.of(null, storageConfig.getRegion())); + stsClientProvider.stsClient(StsDestination.of(endpointUri, storageConfig.getRegion())); AssumeRoleResponse response = stsClient.assumeRole(request.build()); EnumMap credentialMap = @@ -118,6 +119,10 @@ public EnumMap getSubscopedCreds( credentialMap.put(StorageAccessProperty.CLIENT_REGION, storageConfig.getRegion()); } + if (endpointUri != null) { + credentialMap.put(StorageAccessProperty.AWS_ENDPOINT, endpointUri.toString()); + } + if (storageConfig.getAwsPartition().equals("aws-us-gov") && credentialMap.get(StorageAccessProperty.CLIENT_REGION) == null) { throw new IllegalArgumentException( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java index b41e545a30..13902f2561 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java @@ -24,6 +24,7 @@ import com.google.common.base.MoreObjects; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import java.net.URI; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -53,14 +54,24 @@ public class AwsStorageConfigurationInfo extends PolarisStorageConfigurationInfo @JsonProperty(value = "region") private @Nullable String region = null; + /** User ARN for the service principal */ + @JsonProperty(value = "endpoint") + private @Nullable String endpoint; + @JsonCreator public AwsStorageConfigurationInfo( @JsonProperty(value = "storageType", required = true) @Nonnull StorageType storageType, @JsonProperty(value = "allowedLocations", required = true) @Nonnull List allowedLocations, @JsonProperty(value = "roleARN", required = true) @Nonnull String roleARN, - @JsonProperty(value = "region", required = false) @Nullable String region) { - this(storageType, allowedLocations, roleARN, null, region); + @JsonProperty(value = "externalId") @Nullable String externalId, + @JsonProperty(value = "region", required = false) @Nullable String region, + @JsonProperty(value = "endpoint") @Nullable String endpoint) { + super(storageType, allowedLocations); + this.roleARN = roleARN; + this.externalId = externalId; + this.region = region; + this.endpoint = endpoint; } public AwsStorageConfigurationInfo( @@ -69,10 +80,7 @@ public AwsStorageConfigurationInfo( @Nonnull String roleARN, @Nullable String externalId, @Nullable String region) { - super(storageType, allowedLocations); - this.roleARN = roleARN; - this.externalId = externalId; - this.region = region; + this(storageType, allowedLocations, roleARN, externalId, region, null); } @Override @@ -121,6 +129,12 @@ public void setRegion(@Nullable String region) { this.region = region; } + @JsonIgnore + @Nullable + public URI getEndpointUri() { + return endpoint == null ? null : URI.create(endpoint); + } + @JsonIgnore public String getAwsAccountId() { return parseAwsAccountId(roleARN); diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 3eeea0fb6e..68c6929791 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -104,6 +104,8 @@ dependencies { testImplementation(project(":polaris-api-management-model")) testImplementation(testFixtures(project(":polaris-service-common"))) + testImplementation(project(":polaris-minio-testcontainer")) + testImplementation("org.apache.iceberg:iceberg-api:${libs.versions.iceberg.get()}:tests") testImplementation("org.apache.iceberg:iceberg-core:${libs.versions.iceberg.get()}:tests") diff --git a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java new file mode 100644 index 0000000000..b82b5cce6a --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.it; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableMap; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.types.Types; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.PolarisCatalog; +import org.apache.polaris.core.admin.model.PrincipalWithCredentials; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.it.env.CatalogApi; +import org.apache.polaris.service.it.env.ClientCredentials; +import org.apache.polaris.service.it.env.ManagementApi; +import org.apache.polaris.service.it.env.PolarisApiEndpoints; +import org.apache.polaris.service.it.env.PolarisClient; +import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; +import org.apache.polaris.test.minio.Minio; +import org.apache.polaris.test.minio.MinioAccess; +import org.apache.polaris.test.minio.MinioExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +@QuarkusIntegrationTest +@TestProfile(QuarkusRestCatalogMinIoIT.Profile.class) +@ExtendWith(MinioExtension.class) +@ExtendWith(PolarisIntegrationTestExtension.class) +public class QuarkusRestCatalogMinIoIT { + + private static final String BUCKET_URI_PREFIX = "/minio-test"; + private static final String MINIO_ACCESS_KEY = "test-ak-123"; + private static final String MINIO_SECRET_KEY = "test-sk-123"; + + public static class Profile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + return ImmutableMap.builder() + .put("polaris.storage.aws.access-key", MINIO_ACCESS_KEY) + .put("polaris.storage.aws.secret-key", MINIO_SECRET_KEY) + .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", "false") + .build(); + } + } + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get(), "doc"), + optional(2, "data", Types.StringType.get())); + + private static ClientCredentials adminCredentials; + private static PolarisApiEndpoints endpoints; + private static PolarisClient client; + private static ManagementApi managementApi; + private static URI storageBase; + private static String endpoint; + private static S3Client s3Client; + + private CatalogApi catalogApi; + private RESTCatalog restCatalog; + private String catalogName; + + @BeforeAll + static void setup( + PolarisApiEndpoints apiEndpoints, + @Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess, + ClientCredentials credentials) { + s3Client = minioAccess.s3Client(); + adminCredentials = credentials; + endpoints = apiEndpoints; + client = polarisClient(endpoints); + managementApi = client.managementApi(credentials); + storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX); + endpoint = minioAccess.s3endpoint(); + } + + @AfterAll + static void close() throws Exception { + client.close(); + } + + @BeforeEach + public void before(TestInfo testInfo) { + String principalName = client.newEntityName("test-user"); + String principalRoleName = client.newEntityName("test-admin"); + PrincipalWithCredentials principalCredentials = + managementApi.createPrincipalWithRole(principalName, principalRoleName); + + catalogApi = client.catalogApi(principalCredentials); + + catalogName = client.newEntityName(testInfo.getTestMethod().orElseThrow().getName()); + + AwsStorageConfigInfo storageConfig = + AwsStorageConfigInfo.builder() + .setEndpoint(endpoint) + .setRoleArn("arn:aws:iam::123456789012:role/polaris-test") + .setExternalId("externalId123") + .setUserArn("arn:aws:iam::123456789012:user/polaris-test") + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setAllowedLocations(List.of(storageBase.toString())) + .build(); + + CatalogProperties.Builder catalogProps = + CatalogProperties.builder(storageBase.toASCIIString() + "/" + catalogName); + Catalog catalog = + PolarisCatalog.builder() + .setType(Catalog.TypeEnum.INTERNAL) + .setName(catalogName) + .setStorageConfigInfo(storageConfig) + .setProperties(catalogProps.build()) + .build(); + + managementApi.createCatalog(principalRoleName, catalog); + + String authToken = client.obtainToken(principalCredentials); + restCatalog = new RESTCatalog(); + + ImmutableMap.Builder propertiesBuilder = + ImmutableMap.builder() + .put( + org.apache.iceberg.CatalogProperties.URI, endpoints.catalogApiEndpoint().toString()) + .put(OAuth2Properties.TOKEN, authToken) + .put("warehouse", catalogName) + .put("header." + endpoints.realmHeaderName(), endpoints.realmId()) + .put("header.X-Iceberg-Access-Delegation", "vended-credentials"); + + restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast()); + } + + @AfterEach + public void cleanUp() { + client.cleanUp(adminCredentials); + } + + @Test + public void testCreateTable() { + catalogApi.createNamespace(catalogName, "test-ns"); + TableIdentifier id = TableIdentifier.of("test-ns", "t1"); + Table table = restCatalog.createTable(id, SCHEMA); + assertThat(table).isNotNull(); + assertThat(restCatalog.tableExists(id)).isTrue(); + + TableOperations ops = ((HasTableOperations) table).operations(); + URI location = URI.create(ops.current().metadataFileLocation()); + + GetObjectResponse response = + s3Client + .getObject( + GetObjectRequest.builder() + .bucket(location.getAuthority()) + .key(location.getPath().substring(1)) // drop leading slash + .build()) + .response(); + assertThat(response.contentLength()).isGreaterThan(0); + + restCatalog.dropTable(id); + assertThat(restCatalog.tableExists(id)).isFalse(); + } + + @Test + public void testAppendFiles() throws IOException { + catalogApi.createNamespace(catalogName, "test-ns"); + TableIdentifier id = TableIdentifier.of("test-ns", "t1"); + Table table = restCatalog.createTable(id, SCHEMA); + assertThat(table).isNotNull(); + + @SuppressWarnings("resource") + FileIO io = table.io(); + + URI loc = URI.create(table.locationProvider().newDataLocation("test-file1.txt")); + OutputFile f1 = io.newOutputFile(loc.toString()); + try (PositionOutputStream os = f1.create()) { + os.write("Hello World".getBytes(UTF_8)); + } + + DataFile df = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(f1.location()) + .withFormat(FileFormat.PARQUET) // bogus value + .withFileSizeInBytes(4) + .withRecordCount(1) + .build(); + + table.newAppend().appendFile(df).commit(); + + try (InputStream is = + s3Client.getObject( + GetObjectRequest.builder() + .bucket(loc.getAuthority()) + .key(loc.getPath().substring(1)) // drop leading slash + .build())) { + assertThat(new String(is.readAllBytes(), UTF_8)).isEqualTo("Hello World"); + } + } +} diff --git a/spec/polaris-management-service.yml b/spec/polaris-management-service.yml index 197db3384a..1269e7927a 100644 --- a/spec/polaris-management-service.yml +++ b/spec/polaris-management-service.yml @@ -1039,6 +1039,10 @@ components: type: string description: the aws region where data is stored example: "us-east-2" + endpoint: + type: string + description: endpoint for S3 and STS requests (optional) + example: "https://example.com" required: - roleArn diff --git a/tools/minio-testcontainer/build.gradle.kts b/tools/minio-testcontainer/build.gradle.kts new file mode 100644 index 0000000000..38351c7626 --- /dev/null +++ b/tools/minio-testcontainer/build.gradle.kts @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + alias(libs.plugins.jandex) + id("polaris-server") +} + +dependencies { + api(platform(libs.testcontainers.bom)) + api("org.testcontainers:testcontainers") + + api(platform(libs.awssdk.bom)) + api("software.amazon.awssdk:s3") + + implementation(project(":polaris-container-spec-helper")) + implementation("software.amazon.awssdk:url-connection-client") + implementation(libs.guava) + + compileOnly(platform(libs.junit.bom)) + compileOnly("org.junit.jupiter:junit-jupiter-api") +} diff --git a/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/Minio.java b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/Minio.java new file mode 100644 index 0000000000..be8b10eb75 --- /dev/null +++ b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/Minio.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.test.minio; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2 +@Target({ElementType.FIELD, ElementType.PARAMETER}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface Minio { + /** Optional, use this access key instead of a random one. */ + String accessKey() default DEFAULT; + + /** Optional, use this secret key instead of a random one. */ + String secretKey() default DEFAULT; + + /** Optional, use this bucket instead of a random one. */ + String bucket() default DEFAULT; + + String DEFAULT = "minio_default_value__"; +} diff --git a/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java new file mode 100644 index 0000000000..44f5649c10 --- /dev/null +++ b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.test.minio; + +import java.net.URI; +import java.util.Map; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * Provides access to Minio via a preconfigured S3 client and providing the by default randomized + * bucket and access/secret keys. + * + *

Annotate JUnit test instance or static fields or method parameters of this type with {@link + * Minio}. + */ +// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2 +public interface MinioAccess { + + /** Host and port, separated by '{@code :}'. */ + String hostPort(); + + String accessKey(); + + String secretKey(); + + String bucket(); + + /** HTTP protocol endpoint. */ + String s3endpoint(); + + S3Client s3Client(); + + /** Properties needed by Apache Iceberg to access this instance. */ + Map icebergProperties(); + + /** Properties needed by Apache Hadoop to access this instance. */ + Map hadoopConfig(); + + /** S3 scheme URI including the bucket to access the given path. */ + URI s3BucketUri(String path); + + /** Convenience method to put an object into S3. */ + @SuppressWarnings("resource") + default void s3put(String key, RequestBody body) { + s3Client().putObject(b -> b.bucket(bucket()).key(key), body); + } +} diff --git a/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java new file mode 100644 index 0000000000..9c8ee210bd --- /dev/null +++ b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.test.minio; + +import com.google.common.base.Preconditions; +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import org.apache.polaris.containerspec.ContainerSpecHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.Base58; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; + +// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2 +public final class MinioContainer extends GenericContainer + implements MinioAccess, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(MinioContainer.class); + + private static final int DEFAULT_PORT = 9000; + + private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER"; + private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD"; + private static final String MINIO_DOMAIN = "MINIO_DOMAIN"; + + private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; + private static final String HEALTH_ENDPOINT = "/minio/health/ready"; + private static final String MINIO_DOMAIN_NAME; + + /** + * Domain must start with "s3" in order to be recognized as an S3 endpoint by the AWS SDK with + * virtual-host-style addressing. The bucket name is expected to be the first part of the domain + * name, e.g. "bucket.s3.127-0-0-1.nip.io". + */ + private static final String MINIO_DOMAIN_NIP = "s3.127-0-0-1.nip.io"; + + /** + * Whether random bucket names cannot be used. Randomized bucket names can only be used when + * either `*.localhost` (on Linux) or `*.s3.127-0-0-1.nip.io` (on macOS, if DNS rebind protection + * is not active) can be resolved. Otherwise we have to use a fixed bucket name and users have to + * configure that in `/etc/hosts`. + */ + private static final String FIXED_BUCKET_NAME; + + static boolean canRunOnMacOs() { + return MINIO_DOMAIN_NAME.equals(MINIO_DOMAIN_NIP); + } + + static { + String name; + String fixedBucketName = null; + if (System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("linux")) { + name = "localhost"; + } else { + try { + InetAddress ignored = InetAddress.getByName(MINIO_DOMAIN_NIP); + name = MINIO_DOMAIN_NIP; + } catch (UnknownHostException x) { + LOGGER.warn( + "Could not resolve '{}', falling back to 'localhost'. " + + "This usually happens when your router or DNS provider is unable to resolve the nip.io addresses.", + MINIO_DOMAIN_NIP); + name = "localhost"; + fixedBucketName = "miniobucket"; + validateBucketHost(fixedBucketName); + } + } + MINIO_DOMAIN_NAME = name; + FIXED_BUCKET_NAME = fixedBucketName; + } + + /** Validates the bucket host name, on non-Linux, if necessary. */ + private static String validateBucketHost(String bucketName) { + if (FIXED_BUCKET_NAME != null) { + String test = bucketName + ".localhost"; + try { + InetAddress ignored = InetAddress.getByName(test); + } catch (UnknownHostException e) { + LOGGER.warn( + "Could not resolve '{}',\n Please add the line \n '127.0.0.1 {}'\n to your local '/etc/hosts' file.\n Tests are expected to fail unless name resolution works.", + test, + test); + } + } + return bucketName; + } + + private final String accessKey; + private final String secretKey; + private final String bucket; + + private String hostPort; + private String s3endpoint; + private S3Client s3; + private String region; + + @SuppressWarnings("unused") + public MinioContainer() { + this(null, null, null, null); + } + + @SuppressWarnings("resource") + public MinioContainer(String image, String accessKey, String secretKey, String bucket) { + super( + ContainerSpecHelper.containerSpecHelper("minio", MinioContainer.class) + .dockerImageName(image)); + withNetworkAliases(randomString("minio")); + withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(MinioContainer.class))); + addExposedPort(DEFAULT_PORT); + this.accessKey = accessKey != null ? accessKey : randomString("access"); + this.secretKey = secretKey != null ? secretKey : randomString("secret"); + this.bucket = + bucket != null + ? validateBucketHost(bucket) + : (FIXED_BUCKET_NAME != null ? FIXED_BUCKET_NAME : randomString("bucket")); + withEnv(MINIO_ACCESS_KEY, this.accessKey); + withEnv(MINIO_SECRET_KEY, this.secretKey); + // S3 SDK encodes bucket names in host names - need to tell Minio which domain to use + withEnv(MINIO_DOMAIN, MINIO_DOMAIN_NAME); + withCommand("server", DEFAULT_STORAGE_DIRECTORY); + setWaitStrategy( + new HttpWaitStrategy() + .forPort(DEFAULT_PORT) + .forPath(HEALTH_ENDPOINT) + .withStartupTimeout(Duration.ofMinutes(2))); + } + + public MinioContainer withRegion(String region) { + this.region = region; + return this; + } + + private static String randomString(String prefix) { + return prefix + "-" + Base58.randomString(6).toLowerCase(Locale.ROOT); + } + + @Override + public String hostPort() { + Preconditions.checkState(hostPort != null, "Container not yet started"); + return hostPort; + } + + @Override + public String accessKey() { + return accessKey; + } + + @Override + public String secretKey() { + return secretKey; + } + + @Override + public String bucket() { + return bucket; + } + + @Override + public String s3endpoint() { + Preconditions.checkState(s3endpoint != null, "Container not yet started"); + return s3endpoint; + } + + @Override + public S3Client s3Client() { + Preconditions.checkState(s3 != null, "Container not yet started"); + return s3; + } + + @Override + public Map icebergProperties() { + Map props = new HashMap<>(); + props.put("s3.access-key-id", accessKey()); + props.put("s3.secret-access-key", secretKey()); + props.put("s3.endpoint", s3endpoint()); + props.put("http-client.type", "urlconnection"); + return props; + } + + @Override + public Map hadoopConfig() { + Map r = new HashMap<>(); + r.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + r.put("fs.s3a.access.key", accessKey()); + r.put("fs.s3a.secret.key", secretKey()); + r.put("fs.s3a.endpoint", s3endpoint()); + return r; + } + + @Override + public URI s3BucketUri(String path) { + return s3BucketUri("s3", path); + } + + public URI s3BucketUri(String scheme, String path) { + Preconditions.checkState(bucket != null, "Container not yet started"); + return URI.create(String.format("%s://%s/", scheme, bucket)).resolve(path); + } + + @Override + public void start() { + super.start(); + + this.hostPort = MINIO_DOMAIN_NAME + ":" + getMappedPort(DEFAULT_PORT); + this.s3endpoint = String.format("http://%s/", hostPort); + + this.s3 = createS3Client(); + this.s3.createBucket(CreateBucketRequest.builder().bucket(bucket()).build()); + } + + @Override + public void close() { + stop(); + } + + @Override + public void stop() { + try { + if (s3 != null) { + s3.close(); + } + } finally { + s3 = null; + super.stop(); + } + } + + private S3Client createS3Client() { + return S3Client.builder() + .httpClientBuilder(UrlConnectionHttpClient.builder()) + .applyMutation(builder -> builder.endpointOverride(URI.create(s3endpoint()))) + .applyMutation( + builder -> { + if (region != null) { + builder.region(Region.of(region)); + } + }) + // .serviceConfiguration(s3Configuration(s3PathStyleAccess, s3UseArnRegionEnabled)) + // credentialsProvider(s3AccessKeyId, s3SecretAccessKey, s3SessionToken) + .credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey(), secretKey()))) + .build(); + } +} diff --git a/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java new file mode 100644 index 0000000000..f1cdb7fb59 --- /dev/null +++ b/tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.test.minio; + +import static java.lang.String.format; +import static org.junit.jupiter.api.extension.ConditionEvaluationResult.disabled; +import static org.junit.jupiter.api.extension.ConditionEvaluationResult.enabled; +import static org.junit.platform.commons.util.AnnotationUtils.findAnnotatedFields; +import static org.junit.platform.commons.util.ReflectionUtils.makeAccessible; + +import java.lang.reflect.Field; +import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.junit.platform.commons.util.AnnotationUtils; +import org.junit.platform.commons.util.ExceptionUtils; +import org.junit.platform.commons.util.ReflectionUtils; + +/** + * JUnit extension that provides a Minio container configured with a single bucket. + * + *

Provides instances of {@link MinioAccess} via instance or static fields or parameters + * annotated with {@link Minio}. + */ +// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2 +public class MinioExtension + implements BeforeAllCallback, BeforeEachCallback, ParameterResolver, ExecutionCondition { + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(MinioExtension.class); + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + if (OS.current() == OS.LINUX) { + return enabled("Running on Linux"); + } + if (OS.current() == OS.MAC + && System.getenv("CI_MAC") == null + && MinioContainer.canRunOnMacOs()) { + // Disable tests on GitHub Actions + return enabled("Running on macOS locally"); + } + return disabled(format("Disabled on %s", OS.current().name())); + } + + @Override + public void beforeAll(ExtensionContext context) { + Class testClass = context.getRequiredTestClass(); + + findAnnotatedFields(testClass, Minio.class, ReflectionUtils::isStatic) + .forEach(field -> injectField(context, field)); + } + + @Override + public void beforeEach(ExtensionContext context) { + Class testClass = context.getRequiredTestClass(); + + findAnnotatedFields(testClass, Minio.class, ReflectionUtils::isNotStatic) + .forEach(field -> injectField(context, field)); + } + + private void injectField(ExtensionContext context, Field field) { + try { + Minio minio = + AnnotationUtils.findAnnotation(field, Minio.class) + .orElseThrow(IllegalStateException::new); + + MinioAccess container = + context + .getStore(NAMESPACE) + .getOrComputeIfAbsent( + field.toString(), x -> createContainer(minio), MinioAccess.class); + + makeAccessible(field).set(context.getTestInstance().orElse(null), container); + } catch (Throwable t) { + ExceptionUtils.throwAsUncheckedException(t); + } + } + + @Override + public boolean supportsParameter( + ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + if (parameterContext.findAnnotation(Minio.class).isEmpty()) { + return false; + } + return parameterContext.getParameter().getType().isAssignableFrom(MinioAccess.class); + } + + @Override + public Object resolveParameter( + ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + return extensionContext + .getStore(NAMESPACE) + .getOrComputeIfAbsent( + MinioExtension.class.getName() + '#' + parameterContext.getParameter().getName(), + k -> { + Minio minio = parameterContext.findAnnotation(Minio.class).get(); + return createContainer(minio); + }, + MinioAccess.class); + } + + private MinioAccess createContainer(Minio minio) { + String accessKey = nonDefault(minio.accessKey()); + String secretKey = nonDefault(minio.secretKey()); + String bucket = nonDefault(minio.bucket()); + MinioContainer container = + new MinioContainer(null, accessKey, secretKey, bucket).withStartupAttempts(5); + container.start(); + return container; + } + + private static String nonDefault(String s) { + return s.equals(Minio.DEFAULT) ? null : s; + } +} diff --git a/tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version b/tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version new file mode 100644 index 0000000000..3ca9012bb0 --- /dev/null +++ b/tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version @@ -0,0 +1,3 @@ +# Dockerfile to provide the image name and tag to a test. +# Version is managed by Renovate - do not edit. +FROM quay.io/minio/minio:RELEASE.2025-04-08T15-41-24Z From 981be86baf003ab1ab7fe5a2c5702746556c3a4c Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Wed, 25 Jun 2025 00:26:03 -0400 Subject: [PATCH 06/11] fix: license header --- .../test/minio/Dockerfile-minio-version | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version b/tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version index 3ca9012bb0..1dd0f4e2f5 100644 --- a/tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version +++ b/tools/minio-testcontainer/src/main/resources/org/apache/polaris/test/minio/Dockerfile-minio-version @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + # Dockerfile to provide the image name and tag to a test. # Version is managed by Renovate - do not edit. FROM quay.io/minio/minio:RELEASE.2025-04-08T15-41-24Z From 6e671382c0d4a62e8e318ad63aab5d4815773c28 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Wed, 25 Jun 2025 00:52:51 -0400 Subject: [PATCH 07/11] move QuarkusRestCatalogMinIoIT to intTest --- .../polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename runtime/service/src/{test => intTest}/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java (100%) diff --git a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java similarity index 100% rename from runtime/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java rename to runtime/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java From f66682654bbc1eb58e827c3ea9a027c90b345259 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Fri, 27 Jun 2025 20:31:16 -0400 Subject: [PATCH 08/11] review: add sts endpoint --- .../polaris/core/entity/CatalogEntity.java | 3 +- .../aws/AwsCredentialsStorageIntegration.java | 6 +- .../aws/AwsStorageConfigurationInfo.java | 27 +++- .../aws/AwsStorageConfigurationInfoTest.java | 59 ++++++++ .../quarkus/it/QuarkusRestCatalogMinIoIT.java | 129 ++++++++++-------- spec/polaris-management-service.yml | 8 +- 6 files changed, 164 insertions(+), 68 deletions(-) create mode 100644 polaris-core/src/test/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfoTest.java diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java index f1326e4c4a..dc57918378 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java @@ -274,7 +274,8 @@ public Builder setStorageConfigurationInfo( awsConfigModel.getRoleArn(), awsConfigModel.getExternalId(), awsConfigModel.getRegion(), - awsConfigModel.getEndpoint()); + awsConfigModel.getEndpoint(), + awsConfigModel.getStsEndpoint()); awsConfig.validateArn(awsConfigModel.getRoleArn()); config = awsConfig; break; diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index 03edd61300..189c574dd8 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -93,10 +93,11 @@ public EnumMap getSubscopedCreds( credentialsProvider.ifPresent( cp -> request.overrideConfiguration(b -> b.credentialsProvider(cp))); - URI endpointUri = storageConfig.getEndpointUri(); @SuppressWarnings("resource") + // Note: stsClientProvider returns "thin" clients that do not need closing StsClient stsClient = - stsClientProvider.stsClient(StsDestination.of(endpointUri, storageConfig.getRegion())); + stsClientProvider.stsClient( + StsDestination.of(storageConfig.getStsEndpointUri(), storageConfig.getRegion())); AssumeRoleResponse response = stsClient.assumeRole(request.build()); EnumMap credentialMap = @@ -119,6 +120,7 @@ public EnumMap getSubscopedCreds( credentialMap.put(StorageAccessProperty.CLIENT_REGION, storageConfig.getRegion()); } + URI endpointUri = storageConfig.getEndpointUri(); if (endpointUri != null) { credentialMap.put(StorageAccessProperty.AWS_ENDPOINT, endpointUri.toString()); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java index 13902f2561..666d4b0ea2 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java @@ -54,10 +54,14 @@ public class AwsStorageConfigurationInfo extends PolarisStorageConfigurationInfo @JsonProperty(value = "region") private @Nullable String region = null; - /** User ARN for the service principal */ + /** Endpoint URI for S3 API calls */ @JsonProperty(value = "endpoint") private @Nullable String endpoint; + /** Endpoint URI for STS API calls */ + @JsonProperty(value = "stsEndpoint") + private @Nullable String stsEndpoint; + @JsonCreator public AwsStorageConfigurationInfo( @JsonProperty(value = "storageType", required = true) @Nonnull StorageType storageType, @@ -66,12 +70,22 @@ public AwsStorageConfigurationInfo( @JsonProperty(value = "roleARN", required = true) @Nonnull String roleARN, @JsonProperty(value = "externalId") @Nullable String externalId, @JsonProperty(value = "region", required = false) @Nullable String region, - @JsonProperty(value = "endpoint") @Nullable String endpoint) { + @JsonProperty(value = "endpoint") @Nullable String endpoint, + @JsonProperty(value = "stsEndpoint") @Nullable String stsEndpoint) { super(storageType, allowedLocations); this.roleARN = roleARN; this.externalId = externalId; this.region = region; this.endpoint = endpoint; + this.stsEndpoint = stsEndpoint; + } + + public AwsStorageConfigurationInfo( + @Nonnull StorageType storageType, + @Nonnull List allowedLocations, + @Nonnull String roleARN, + @Nullable String region) { + this(storageType, allowedLocations, roleARN, null, region, null, null); } public AwsStorageConfigurationInfo( @@ -80,7 +94,7 @@ public AwsStorageConfigurationInfo( @Nonnull String roleARN, @Nullable String externalId, @Nullable String region) { - this(storageType, allowedLocations, roleARN, externalId, region, null); + this(storageType, allowedLocations, roleARN, externalId, region, null, null); } @Override @@ -135,6 +149,13 @@ public URI getEndpointUri() { return endpoint == null ? null : URI.create(endpoint); } + /** Returns the STS endpoint if set, defaulting to {@link #getEndpointUri()} otherwise. */ + @JsonIgnore + @Nullable + public URI getStsEndpointUri() { + return stsEndpoint == null ? getEndpointUri() : URI.create(stsEndpoint); + } + @JsonIgnore public String getAwsAccountId() { return parseAwsAccountId(roleARN); diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfoTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfoTest.java new file mode 100644 index 0000000000..61ab64ad49 --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfoTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.storage.aws; + +import static org.apache.polaris.core.storage.PolarisStorageConfigurationInfo.StorageType.S3; +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class AwsStorageConfigurationInfoTest { + + private static AwsStorageConfigurationInfo config(String endpoint, String stsEndpoint) { + return new AwsStorageConfigurationInfo( + S3, List.of(), "role", null, null, endpoint, stsEndpoint); + } + + @Test + public void testStsEndpoint() { + assertThat(config(null, null)) + .extracting( + AwsStorageConfigurationInfo::getEndpointUri, + AwsStorageConfigurationInfo::getStsEndpointUri) + .containsExactly(null, null); + assertThat(config(null, "http://sts.example.com")) + .extracting( + AwsStorageConfigurationInfo::getEndpointUri, + AwsStorageConfigurationInfo::getStsEndpointUri) + .containsExactly(null, URI.create("http://sts.example.com")); + assertThat(config("http://s3.example.com", null)) + .extracting( + AwsStorageConfigurationInfo::getEndpointUri, + AwsStorageConfigurationInfo::getStsEndpointUri) + .containsExactly(URI.create("http://s3.example.com"), URI.create("http://s3.example.com")); + assertThat(config("http://s3.example.com", "http://sts.example.com")) + .extracting( + AwsStorageConfigurationInfo::getEndpointUri, + AwsStorageConfigurationInfo::getStsEndpointUri) + .containsExactly(URI.create("http://s3.example.com"), URI.create("http://sts.example.com")); + } +} diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java index b82b5cce6a..af918f68fd 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusRestCatalogMinIoIT.java @@ -33,6 +33,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -110,7 +111,8 @@ public Map getConfigOverrides() { private static S3Client s3Client; private CatalogApi catalogApi; - private RESTCatalog restCatalog; + private String principalRoleName; + private PrincipalWithCredentials principalCredentials; private String catalogName; @BeforeAll @@ -135,23 +137,25 @@ static void close() throws Exception { @BeforeEach public void before(TestInfo testInfo) { String principalName = client.newEntityName("test-user"); - String principalRoleName = client.newEntityName("test-admin"); - PrincipalWithCredentials principalCredentials = - managementApi.createPrincipalWithRole(principalName, principalRoleName); + principalRoleName = client.newEntityName("test-admin"); + principalCredentials = managementApi.createPrincipalWithRole(principalName, principalRoleName); catalogApi = client.catalogApi(principalCredentials); catalogName = client.newEntityName(testInfo.getTestMethod().orElseThrow().getName()); + } - AwsStorageConfigInfo storageConfig = + private RESTCatalog createCatalog(Optional endpoint, Optional stsEndpoint) { + AwsStorageConfigInfo.Builder storageConfig = AwsStorageConfigInfo.builder() - .setEndpoint(endpoint) .setRoleArn("arn:aws:iam::123456789012:role/polaris-test") .setExternalId("externalId123") .setUserArn("arn:aws:iam::123456789012:user/polaris-test") .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) - .setAllowedLocations(List.of(storageBase.toString())) - .build(); + .setAllowedLocations(List.of(storageBase.toString())); + + endpoint.ifPresent(storageConfig::setEndpoint); + stsEndpoint.ifPresent(storageConfig::setStsEndpoint); CatalogProperties.Builder catalogProps = CatalogProperties.builder(storageBase.toASCIIString() + "/" + catalogName); @@ -159,14 +163,14 @@ public void before(TestInfo testInfo) { PolarisCatalog.builder() .setType(Catalog.TypeEnum.INTERNAL) .setName(catalogName) - .setStorageConfigInfo(storageConfig) + .setStorageConfigInfo(storageConfig.build()) .setProperties(catalogProps.build()) .build(); managementApi.createCatalog(principalRoleName, catalog); String authToken = client.obtainToken(principalCredentials); - restCatalog = new RESTCatalog(); + RESTCatalog restCatalog = new RESTCatalog(); ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() @@ -178,6 +182,7 @@ public void before(TestInfo testInfo) { .put("header.X-Iceberg-Access-Delegation", "vended-credentials"); restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast()); + return restCatalog; } @AfterEach @@ -186,63 +191,67 @@ public void cleanUp() { } @Test - public void testCreateTable() { - catalogApi.createNamespace(catalogName, "test-ns"); - TableIdentifier id = TableIdentifier.of("test-ns", "t1"); - Table table = restCatalog.createTable(id, SCHEMA); - assertThat(table).isNotNull(); - assertThat(restCatalog.tableExists(id)).isTrue(); - - TableOperations ops = ((HasTableOperations) table).operations(); - URI location = URI.create(ops.current().metadataFileLocation()); - - GetObjectResponse response = - s3Client - .getObject( - GetObjectRequest.builder() - .bucket(location.getAuthority()) - .key(location.getPath().substring(1)) // drop leading slash - .build()) - .response(); - assertThat(response.contentLength()).isGreaterThan(0); - - restCatalog.dropTable(id); - assertThat(restCatalog.tableExists(id)).isFalse(); + public void testCreateTable() throws IOException { + try (RESTCatalog restCatalog = createCatalog(Optional.of(endpoint), Optional.empty())) { + catalogApi.createNamespace(catalogName, "test-ns"); + TableIdentifier id = TableIdentifier.of("test-ns", "t1"); + Table table = restCatalog.createTable(id, SCHEMA); + assertThat(table).isNotNull(); + assertThat(restCatalog.tableExists(id)).isTrue(); + + TableOperations ops = ((HasTableOperations) table).operations(); + URI location = URI.create(ops.current().metadataFileLocation()); + + GetObjectResponse response = + s3Client + .getObject( + GetObjectRequest.builder() + .bucket(location.getAuthority()) + .key(location.getPath().substring(1)) // drop leading slash + .build()) + .response(); + assertThat(response.contentLength()).isGreaterThan(0); + + restCatalog.dropTable(id); + assertThat(restCatalog.tableExists(id)).isFalse(); + } } @Test public void testAppendFiles() throws IOException { - catalogApi.createNamespace(catalogName, "test-ns"); - TableIdentifier id = TableIdentifier.of("test-ns", "t1"); - Table table = restCatalog.createTable(id, SCHEMA); - assertThat(table).isNotNull(); - - @SuppressWarnings("resource") - FileIO io = table.io(); - - URI loc = URI.create(table.locationProvider().newDataLocation("test-file1.txt")); - OutputFile f1 = io.newOutputFile(loc.toString()); - try (PositionOutputStream os = f1.create()) { - os.write("Hello World".getBytes(UTF_8)); - } + try (RESTCatalog restCatalog = createCatalog(Optional.of(endpoint), Optional.of(endpoint))) { + catalogApi.createNamespace(catalogName, "test-ns"); + TableIdentifier id = TableIdentifier.of("test-ns", "t1"); + Table table = restCatalog.createTable(id, SCHEMA); + assertThat(table).isNotNull(); - DataFile df = - DataFiles.builder(PartitionSpec.unpartitioned()) - .withPath(f1.location()) - .withFormat(FileFormat.PARQUET) // bogus value - .withFileSizeInBytes(4) - .withRecordCount(1) - .build(); + @SuppressWarnings("resource") + FileIO io = table.io(); + + URI loc = URI.create(table.locationProvider().newDataLocation("test-file1.txt")); + OutputFile f1 = io.newOutputFile(loc.toString()); + try (PositionOutputStream os = f1.create()) { + os.write("Hello World".getBytes(UTF_8)); + } + + DataFile df = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(f1.location()) + .withFormat(FileFormat.PARQUET) // bogus value + .withFileSizeInBytes(4) + .withRecordCount(1) + .build(); - table.newAppend().appendFile(df).commit(); + table.newAppend().appendFile(df).commit(); - try (InputStream is = - s3Client.getObject( - GetObjectRequest.builder() - .bucket(loc.getAuthority()) - .key(loc.getPath().substring(1)) // drop leading slash - .build())) { - assertThat(new String(is.readAllBytes(), UTF_8)).isEqualTo("Hello World"); + try (InputStream is = + s3Client.getObject( + GetObjectRequest.builder() + .bucket(loc.getAuthority()) + .key(loc.getPath().substring(1)) // drop leading slash + .build())) { + assertThat(new String(is.readAllBytes(), UTF_8)).isEqualTo("Hello World"); + } } } } diff --git a/spec/polaris-management-service.yml b/spec/polaris-management-service.yml index 1269e7927a..39c767e66b 100644 --- a/spec/polaris-management-service.yml +++ b/spec/polaris-management-service.yml @@ -1041,8 +1041,12 @@ components: example: "us-east-2" endpoint: type: string - description: endpoint for S3 and STS requests (optional) - example: "https://example.com" + description: endpoint for S3 requests (optional) + example: "https://s3.example.com:1234" + stsEndpoint: + type: string + description: endpoint for STS requests (optional). If not set, defaults to 'endpoint'. + example: "https://sts.example.com:1234" required: - roleArn From ab7b8e622b8fd5dc51d8eb5e24fe457b91350583 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Mon, 30 Jun 2025 17:47:37 -0400 Subject: [PATCH 09/11] add CHANGELOG entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0c93053a2..9593d75e20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### New Features +- Added Catalog configuration for S3 and STS endpoints. This also allows using non-AWS S3 implementations. + ### Changes ### Deprecations From c61861b10d3d2d39f7b2b4db5e4b3d13b056a77d Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Wed, 2 Jul 2025 18:11:46 -0400 Subject: [PATCH 10/11] review: aws-sdk-http-client --- .../polaris/service/quarkus/config/QuarkusProducers.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index 7bd3a6ea23..bea9da48e5 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -177,7 +177,7 @@ public UserSecretsManagerFactory userSecretsManagerFactory( @Produces @Singleton - @Identifier("http-client-s3") + @Identifier("aws-sdk-http-client") public SdkHttpClient sdkHttpClient(S3AccessConfig config) { ApacheHttpClient.Builder httpClient = ApacheHttpClient.builder(); config.maxHttpConnections().ifPresent(httpClient::maxConnections); @@ -190,14 +190,15 @@ public SdkHttpClient sdkHttpClient(S3AccessConfig config) { return httpClient.build(); } - public void closeSdkHttpClient(@Disposes @Identifier("http-client-s3") SdkHttpClient client) { + public void closeSdkHttpClient( + @Disposes @Identifier("aws-sdk-http-client") SdkHttpClient client) { client.close(); } @Produces @ApplicationScoped public StsClientsPool stsClientsPool( - @Identifier("http-client-s3") SdkHttpClient httpClient, + @Identifier("aws-sdk-http-client") SdkHttpClient httpClient, S3AccessConfig config, MeterRegistry meterRegistry) { return new StsClientsPool(config, httpClient, meterRegistry); From 4c7250222bc43c967968e36effb1c21e8bf53456 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Thu, 3 Jul 2025 16:14:41 -0400 Subject: [PATCH 11/11] review: simplify StsClientsPool constructor args --- .../polaris/service/quarkus/config/QuarkusProducers.java | 2 +- .../apache/polaris/service/storage/aws/StsClientsPool.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index bea9da48e5..4190b55b50 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -201,7 +201,7 @@ public StsClientsPool stsClientsPool( @Identifier("aws-sdk-http-client") SdkHttpClient httpClient, S3AccessConfig config, MeterRegistry meterRegistry) { - return new StsClientsPool(config, httpClient, meterRegistry); + return new StsClientsPool(config.effectiveClientsCacheMaxSize(), httpClient, meterRegistry); } /** diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java index b208f3b4ec..170274d40a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java @@ -49,9 +49,9 @@ public class StsClientsPool implements StsClientProvider { private final Function clientBuilder; public StsClientsPool( - S3AccessConfig effectiveSts, SdkHttpClient sdkHttpClient, MeterRegistry meterRegistry) { + int clientsCacheMaxSize, SdkHttpClient sdkHttpClient, MeterRegistry meterRegistry) { this( - effectiveSts.effectiveClientsCacheMaxSize(), + clientsCacheMaxSize, key -> defaultStsClient(key, sdkHttpClient), Optional.ofNullable(meterRegistry)); }