Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,9 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisCredentialVendor;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;

/**
* A default FileIO factory implementation for creating Iceberg {@link FileIO} instances with
Expand All @@ -52,15 +47,11 @@
@Identifier("default")
public class DefaultFileIOFactory implements FileIOFactory {

private final StorageCredentialCache storageCredentialCache;
private final MetaStoreManagerFactory metaStoreManagerFactory;
private final AccessConfigProvider accessConfigProvider;

@Inject
public DefaultFileIOFactory(
StorageCredentialCache storageCredentialCache,
MetaStoreManagerFactory metaStoreManagerFactory) {
this.storageCredentialCache = storageCredentialCache;
this.metaStoreManagerFactory = metaStoreManagerFactory;
public DefaultFileIOFactory(AccessConfigProvider accessConfigProvider) {
this.accessConfigProvider = accessConfigProvider;
}

@Override
Expand All @@ -72,36 +63,25 @@ public FileIO loadFileIO(
@Nonnull Set<String> tableLocations,
@Nonnull Set<PolarisStorageActions> storageActions,
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
RealmContext realmContext = callContext.getRealmContext();
PolarisCredentialVendor credentialVendor =
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);

// Get subcoped creds
properties = new HashMap<>(properties);
Optional<PolarisEntity> storageInfoEntity =
FileIOUtil.findStorageInfoFromHierarchy(resolvedEntityPath);
Optional<AccessConfig> accessConfig =
storageInfoEntity.map(
storageInfo ->
FileIOUtil.refreshAccessConfig(
callContext,
storageCredentialCache,
credentialVendor,
identifier,
tableLocations,
storageActions,
storageInfo,
Optional.empty()));
AccessConfig accessConfig =
accessConfigProvider.getAccessConfig(
callContext,
identifier,
tableLocations,
storageActions,
Optional.empty(),
resolvedEntityPath);

// Update the FileIO with the subscoped credentials
// Update with properties in case there are table-level overrides the credentials should
// always override table-level properties, since storage configuration will be found at
// whatever entity defines it
if (accessConfig.isPresent()) {
properties.putAll(accessConfig.get().credentials());
properties.putAll(accessConfig.get().extraProperties());
properties.putAll(accessConfig.get().internalProperties());
}
properties.putAll(accessConfig.credentials());
properties.putAll(accessConfig.extraProperties());
properties.putAll(accessConfig.internalProperties());

return loadFileIOInternal(ioImplClassName, properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;

/** A {@link FileIOFactory} that translates WASB paths to ABFS ones */
@ApplicationScoped
Expand All @@ -40,11 +38,8 @@ public class WasbTranslatingFileIOFactory implements FileIOFactory {
private final FileIOFactory defaultFileIOFactory;

@Inject
public WasbTranslatingFileIOFactory(
StorageCredentialCache storageCredentialCache,
MetaStoreManagerFactory metaStoreManagerFactory) {
defaultFileIOFactory =
new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory);
public WasbTranslatingFileIOFactory(AccessConfigProvider accessConfigProvider) {
defaultFileIOFactory = new DefaultFileIOFactory(accessConfigProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static String makeTableLocation(

public PolarisS3InteroperabilityTest() {
TestServices.FileIOFactorySupplier fileIOFactorySupplier =
(storageCredentialCache, metaStoreManagerFactory) ->
(accessConfigProvider) ->
(FileIOFactory)
(callContext,
ioImplClassName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.ReservedProperties;
Expand Down Expand Up @@ -120,6 +121,7 @@ public abstract class AbstractPolarisGenericTableCatalogTest {
private PolarisPrincipal authenticatedRoot;
private PolarisEntity catalogEntity;
private SecurityContext securityContext;
private AccessConfigProvider accessConfigProvider;

protected static final Schema SCHEMA =
new Schema(
Expand Down Expand Up @@ -156,6 +158,8 @@ public void before(TestInfo testInfo) {
metaStoreManagerFactory.getOrCreateSession(realmContext),
configurationStore);
realmConfig = polarisContext.getRealmConfig();
accessConfigProvider =
new AccessConfigProvider(storageCredentialCache, metaStoreManagerFactory);

PrincipalEntity rootPrincipal =
metaStoreManager.findRootPrincipal(polarisContext).orElseThrow();
Expand Down Expand Up @@ -211,7 +215,7 @@ public void before(TestInfo testInfo) {
new PolarisPassthroughResolutionView(
resolutionManifestFactory, securityContext, CATALOG_NAME);
TaskExecutor taskExecutor = Mockito.mock();
this.fileIOFactory = new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory);
this.fileIOFactory = new DefaultFileIOFactory(accessConfigProvider);

StsClient stsClient = Mockito.mock(StsClient.class);
when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.Profiles;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
import org.apache.polaris.service.catalog.io.FileIOFactory;
Expand Down Expand Up @@ -249,6 +250,7 @@ public Map<String, String> getConfigOverrides() {
private SecurityContext securityContext;
private TestPolarisEventListener testPolarisEventListener;
private ReservedProperties reservedProperties;
private AccessConfigProvider accessConfigProvider;

@BeforeAll
public static void setUpMocks() {
Expand Down Expand Up @@ -286,7 +288,8 @@ public void before(TestInfo testInfo) {
metaStoreManagerFactory.getOrCreateSession(realmContext),
configurationStore);
realmConfig = polarisContext.getRealmConfig();

accessConfigProvider =
new AccessConfigProvider(storageCredentialCache, metaStoreManagerFactory);
EntityCache entityCache = createEntityCache(diagServices, realmConfig, metaStoreManager);
resolverFactory =
(securityContext, referenceCatalogName) ->
Expand Down Expand Up @@ -352,7 +355,7 @@ public void before(TestInfo testInfo) {
.build()
.asCatalog(serviceIdentityProvider)));

this.fileIOFactory = new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory);
this.fileIOFactory = new DefaultFileIOFactory(accessConfigProvider);

StsClient stsClient = Mockito.mock(StsClient.class);
when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
Expand Down Expand Up @@ -996,8 +999,7 @@ public void testValidateNotificationFailToCreateFileIO() {
// filename.
final String tableLocation = "s3://externally-owned-bucket/validate_table/";
final String tableMetadataLocation = tableLocation + "metadata/";
FileIOFactory fileIOFactory =
spy(new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory));
FileIOFactory fileIOFactory = spy(new DefaultFileIOFactory(accessConfigProvider));
IcebergCatalog catalog = newIcebergCatalog(catalog().name(), metaStoreManager, fileIOFactory);
catalog.initialize(
CATALOG_NAME,
Expand Down Expand Up @@ -1914,8 +1916,7 @@ public void testDropTableWithPurge() {
.containsEntry(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName(), SECRET_ACCESS_KEY)
.containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), SESSION_TOKEN);
FileIO fileIO =
new TaskFileIOSupplier(
new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory))
new TaskFileIOSupplier(new DefaultFileIOFactory(accessConfigProvider))
.apply(taskEntity, TABLE, polarisContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
Expand Down Expand Up @@ -2041,8 +2042,7 @@ static Stream<Arguments> testRetriableException() {

@Test
public void testFileIOWrapper() {
MeasuredFileIOFactory measured =
new MeasuredFileIOFactory(storageCredentialCache, metaStoreManagerFactory);
MeasuredFileIOFactory measured = new MeasuredFileIOFactory(accessConfigProvider);
IcebergCatalog catalog = newIcebergCatalog(CATALOG_NAME, metaStoreManager, measured);
catalog.initialize(
CATALOG_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.Profiles;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.ReservedProperties;
Expand Down Expand Up @@ -123,6 +124,7 @@ public Map<String, String> getConfigOverrides() {
private UserSecretsManager userSecretsManager;
private PolarisCallContext polarisContext;
private RealmConfig realmConfig;
private AccessConfigProvider accessConfigProvider;

private TestPolarisEventListener testPolarisEventListener;

Expand Down Expand Up @@ -163,7 +165,8 @@ public void before(TestInfo testInfo) {
metaStoreManagerFactory.getOrCreateSession(realmContext),
configurationStore);
realmConfig = polarisContext.getRealmConfig();

accessConfigProvider =
new AccessConfigProvider(storageCredentialCache, metaStoreManagerFactory);
PrincipalEntity rootPrincipal =
metaStoreManager.findRootPrincipal(polarisContext).orElseThrow();
PolarisPrincipal authenticatedRoot = PolarisPrincipal.of(rootPrincipal, Set.of());
Expand Down Expand Up @@ -207,8 +210,7 @@ public void before(TestInfo testInfo) {
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
resolutionManifestFactory, securityContext, CATALOG_NAME);
FileIOFactory fileIOFactory =
new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory);
FileIOFactory fileIOFactory = new DefaultFileIOFactory(accessConfigProvider);

testPolarisEventListener = (TestPolarisEventListener) polarisEventListener;
testPolarisEventListener.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1899,7 +1899,7 @@ public void testSendNotificationSufficientPrivileges() {
resolverFactory,
managerFactory,
Mockito.mock(),
new DefaultFileIOFactory(storageCredentialCache, managerFactory),
new DefaultFileIOFactory(accessConfigProvider),
polarisEventListener) {
@Override
public Catalog createCallContextCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public void before(TestInfo testInfo) {

// Spy FileIOFactory and check if the credentials are passed to the FileIO
TestServices.FileIOFactorySupplier fileIOFactorySupplier =
(storageCredentialCache, metaStoreManagerFactory) ->
(accessConfigProvider) ->
Mockito.spy(
new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory) {
new DefaultFileIOFactory(accessConfigProvider) {
@Override
FileIO loadFileIOInternal(
@Nonnull String ioImplClassName, @Nonnull Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.AccessConfigProvider;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.ReservedProperties;
Expand Down Expand Up @@ -146,6 +147,7 @@ public abstract class AbstractPolicyCatalogTest {
private PolarisPrincipal authenticatedRoot;
private PolarisEntity catalogEntity;
private SecurityContext securityContext;
private AccessConfigProvider accessConfigProvider;

@BeforeAll
public static void setUpMocks() {
Expand Down Expand Up @@ -177,6 +179,8 @@ public void before(TestInfo testInfo) {
metaStoreManagerFactory.getOrCreateSession(realmContext),
configurationStore);
realmConfig = polarisContext.getRealmConfig();
accessConfigProvider =
new AccessConfigProvider(storageCredentialCache, metaStoreManagerFactory);

PrincipalEntity rootPrincipal =
metaStoreManager.findRootPrincipal(polarisContext).orElseThrow();
Expand Down Expand Up @@ -230,7 +234,7 @@ public void before(TestInfo testInfo) {
new PolarisPassthroughResolutionView(
resolutionManifestFactory, securityContext, CATALOG_NAME);
TaskExecutor taskExecutor = Mockito.mock();
this.fileIOFactory = new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory);
this.fileIOFactory = new DefaultFileIOFactory(accessConfigProvider);

StsClient stsClient = Mockito.mock(StsClient.class);
when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
Expand Down Expand Up @@ -116,8 +116,7 @@ public record TestServices(
private static final String GCP_ACCESS_TOKEN = "abc";

@FunctionalInterface
public interface FileIOFactorySupplier
extends BiFunction<StorageCredentialCache, MetaStoreManagerFactory, FileIOFactory> {}
public interface FileIOFactorySupplier extends Function<AccessConfigProvider, FileIOFactory> {}

private static class MockedConfigurationStore implements PolarisConfigurationStore {
private final Map<String, Object> defaults;
Expand All @@ -144,7 +143,8 @@ public static class Builder {
private RealmContext realmContext = TEST_REALM;
private Map<String, Object> config = Map.of();
private StsClient stsClient;
private FileIOFactorySupplier fileIOFactorySupplier = MeasuredFileIOFactory::new;
private FileIOFactorySupplier fileIOFactorySupplier =
metaStoreManagerFactory1 -> new MeasuredFileIOFactory(metaStoreManagerFactory1);

private Builder() {
stsClient = Mockito.mock(StsClient.class, RETURNS_DEEP_STUBS);
Expand Down Expand Up @@ -242,8 +242,9 @@ public TestServices build() {
PolarisCredentialManager credentialManager =
new DefaultPolarisCredentialManager(realmContext, mockCredentialVendors);

FileIOFactory fileIOFactory =
fileIOFactorySupplier.apply(storageCredentialCache, metaStoreManagerFactory);
AccessConfigProvider accessConfigProvider =
new AccessConfigProvider(storageCredentialCache, metaStoreManagerFactory);
FileIOFactory fileIOFactory = fileIOFactorySupplier.apply(accessConfigProvider);

TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);

Expand All @@ -257,9 +258,6 @@ public TestServices build() {
fileIOFactory,
polarisEventListener);

AccessConfigProvider accessConfigProvider =
new AccessConfigProvider(storageCredentialCache, metaStoreManagerFactory);

ReservedProperties reservedProperties = ReservedProperties.NONE;

CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(realmConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;

/**
* A FileIOFactory that measures the number of bytes read, files written, and files deleted. It can
Expand All @@ -52,11 +50,8 @@ public class MeasuredFileIOFactory implements FileIOFactory {
private final FileIOFactory defaultFileIOFactory;

@Inject
public MeasuredFileIOFactory(
StorageCredentialCache storageCredentialCache,
MetaStoreManagerFactory metaStoreManagerFactory) {
defaultFileIOFactory =
new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory);
public MeasuredFileIOFactory(AccessConfigProvider accessConfigProvider) {
defaultFileIOFactory = new DefaultFileIOFactory(accessConfigProvider);
}

@Override
Expand Down