Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public Map<String, String> getConfigOverrides() {
@Inject PolarisDiagnostics diagServices;

private BasePolarisCatalog catalog;
private CallContext callContext;
private AwsStorageConfigInfo storageConfigModel;
private StsClient stsClient;
private String realmName;
Expand Down Expand Up @@ -199,8 +200,7 @@ public void before(TestInfo testInfo) {
new PolarisEntityManager(
metaStoreManager, new StorageCredentialCache(), new EntityCache(metaStoreManager));

CallContext callContext = CallContext.of(realmContext, polarisContext);
CallContext.setCurrentContext(callContext);
callContext = CallContext.of(realmContext, polarisContext);

PrincipalEntity rootEntity =
new PrincipalEntity(
Expand Down Expand Up @@ -527,7 +527,7 @@ public void testValidateNotificationFailToCreateFileIO() {
final String tableMetadataLocation = tableLocation + "metadata/";
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
CallContext.getCurrentContext(), entityManager, securityContext, catalog().name());
callContext, entityManager, securityContext, catalog().name());
FileIOFactory fileIOFactory =
spy(
new DefaultFileIOFactory(
Expand All @@ -538,7 +538,7 @@ public void testValidateNotificationFailToCreateFileIO() {
new BasePolarisCatalog(
entityManager,
metaStoreManager,
CallContext.getCurrentContext(),
callContext,
passthroughView,
securityContext,
Mockito.mock(TaskExecutor.class),
Expand Down Expand Up @@ -854,7 +854,6 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() {
.setName(catalogWithoutStorage)
.build());

CallContext callContext = CallContext.getCurrentContext();
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, catalogWithoutStorage);
Expand Down Expand Up @@ -919,7 +918,6 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() {
.setName(catalogName)
.build());

CallContext callContext = CallContext.getCurrentContext();
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, catalogName);
Expand Down Expand Up @@ -1434,7 +1432,7 @@ public void testDropTableWithPurge() {
new RealmEntityManagerFactory(metaStoreManagerFactory),
metaStoreManagerFactory,
configurationStore))
.apply(taskEntity, () -> realmName);
.apply(taskEntity, callContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
}

Expand All @@ -1461,8 +1459,6 @@ public void testDropTableWithPurgeDisabled() {
.addProperty(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "false")
.setStorageConfigurationInfo(noPurgeStorageConfigModel, storageLocation)
.build());
RealmContext realmContext = () -> "realm";
CallContext callContext = CallContext.of(realmContext, polarisContext);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, noPurgeCatalogName);
Expand Down Expand Up @@ -1542,9 +1538,6 @@ public void testRetriableException() {

@Test
public void testFileIOWrapper() {
RealmContext realmContext = () -> "realm";
CallContext callContext = CallContext.of(realmContext, polarisContext);
CallContext.setCurrentContext(callContext);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, CATALOG_NAME);
Expand Down Expand Up @@ -1600,15 +1593,15 @@ public void testFileIOWrapper() {
new FileIOFactory() {
@Override
public FileIO loadFileIO(
@NotNull RealmContext realmContext,
@NotNull CallContext callContext,
@NotNull String ioImplClassName,
@NotNull Map<String, String> properties,
@NotNull TableIdentifier identifier,
@NotNull Set<String> tableLocations,
@NotNull Set<PolarisStorageActions> storageActions,
@NotNull PolarisResolvedPathWrapper resolvedEntityPath) {
return measured.loadFileIO(
realmContext,
callContext,
"org.apache.iceberg.inmemory.InMemoryFileIO",
Map.of(),
TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
new FileIOFactory() {
@Override
public FileIO loadFileIO(
@NotNull RealmContext realmContext,
@NotNull CallContext callContext,
@NotNull String ioImplClassName,
@NotNull Map<String, String> properties,
@NotNull TableIdentifier identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
new FileIOFactory() {
@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,10 +836,9 @@ public Map<String, String> getCredentialConfig(
return Map.of();
}
return FileIOUtil.refreshCredentials(
callContext.getRealmContext(),
callContext,
entityManager,
getCredentialVendor(),
callContext.getPolarisCallContext().getMetaStore(),
callContext.getPolarisCallContext().getConfigurationStore(),
tableIdentifier,
getLocationsAllowedToBeAccessed(tableMetadata),
Expand Down Expand Up @@ -1614,7 +1613,7 @@ private FileIO loadFileIOForTableLike(
// Reload fileIO based on table specific context
FileIO fileIO =
fileIOFactory.loadFileIO(
callContext.getRealmContext(),
callContext,
ioImplClassName,
tableProperties,
identifier,
Expand Down Expand Up @@ -2077,13 +2076,7 @@ private FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity));
Set<PolarisStorageActions> storageActions = Set.of(PolarisStorageActions.ALL);
return fileIOFactory.loadFileIO(
callContext.getRealmContext(),
ioImpl,
properties,
identifier,
locations,
storageActions,
resolvedPath);
callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
}

private void blockedUserSpecifiedWriteLocation(Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
Expand Down Expand Up @@ -70,13 +71,14 @@ public DefaultFileIOFactory(

@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
@Nonnull Set<String> tableLocations,
@Nonnull Set<PolarisStorageActions> storageActions,
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
RealmContext realmContext = callContext.getRealmContext();
PolarisEntityManager entityManager =
realmEntityManagerFactory.getOrCreateEntityManager(realmContext);
PolarisCredentialVendor credentialVendor =
Expand All @@ -93,10 +95,9 @@ public FileIO loadFileIO(
.map(
storageInfo ->
FileIOUtil.refreshCredentials(
realmContext,
callContext,
entityManager,
credentialVendor,
metaStoreSession,
configurationStore,
identifier,
tableLocations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Set;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;

Expand All @@ -41,7 +41,7 @@ public interface FileIOFactory {
* <p>This method may obtain subscoped credentials to restrict the FileIO's permissions, ensuring
* secure and limited access to the table's data and locations.
*
* @param realmContext the realm for which the FileIO is being loaded.
* @param callContext the call for which the FileIO is being loaded.
* @param ioImplClassName the class name of the FileIO implementation to load.
* @param properties configuration properties for the FileIO.
* @param identifier the table identifier.
Expand All @@ -51,7 +51,7 @@ public interface FileIOFactory {
* @return a configured FileIO instance.
*/
FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exposes PolarisMetaStoreSession (via PolarisCallContext). Do FileIO factories actually need PolarisCallContext?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What part of CallContext is actually required by FileIO factories?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify: we're exposing extra information here. Removing what has been exposed is much harder than exposing more details later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it actually is already needed because it uses the getOrGenerateSubScopeCreds which for now is still in the main persistence interface (it's starting to be separated out via the PolarisCredentialVendor interface, but that still needs more work to fully split, and either way the Transacting impls need the MetaStoreSession to get the subscoped creds).

So it actually needs all of PolarisCallContext right now (and before this PR was accessing it via the ThreadLocal).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point to keep in mind though that right now CallContext does inadvertently give a big bag of goodies to the methods being called, so we should still be careful to think about whether a smaller subset can be carved out to minimize surface area to what is actually used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrGenerateSubScopeCreds is defined in StorageCredentialCache not in PolarisMetaStoreSession.

I did not block this PR, but I think this API change is far from ideal.

@dennishuo : would you mind refactoring it some more to reduce the scope of services exposed to FileIO factories?

@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import org.apache.polaris.core.PolarisConfiguration;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityConstants;
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisCredentialVendor;
import org.apache.polaris.core.storage.PolarisStorageActions;
Expand Down Expand Up @@ -78,16 +76,14 @@ public static Optional<PolarisEntity> findStorageInfoFromHierarchy(
* </ul>
*/
public static Map<String, String> refreshCredentials(
RealmContext realmContext,
CallContext callContext,
PolarisEntityManager entityManager,
PolarisCredentialVendor credentialVendor,
PolarisMetaStoreSession metaStoreSession,
PolarisConfigurationStore configurationStore,
TableIdentifier tableIdentifier,
Set<String> tableLocations,
Set<PolarisStorageActions> storageActions,
PolarisEntity entity) {
CallContext callContext = CallContext.getCurrentContext();

boolean skipCredentialSubscopingIndirection =
configurationStore.getConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
Expand All @@ -52,7 +52,7 @@ public WasbTranslatingFileIOFactory(

@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
Expand All @@ -61,7 +61,7 @@ public FileIO loadFileIO(
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
return new WasbTranslatingFileIO(
defaultFileIOFactory.loadFileIO(
realmContext,
callContext,
ioImplClassName,
properties,
identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean canHandleTask(TaskEntity task) {
public boolean handleTask(TaskEntity task, CallContext callContext) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext.getRealmContext())) {
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
if (task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP) {
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData());
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
// It's likely the cleanupTask has already been completed, but wasn't dropped successfully.
// Log a
// warning and move on
try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext.getRealmContext())) {
try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) {
if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) {
LOGGER
.atWarn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.PolarisTaskConstants;
import org.apache.polaris.core.entity.TableLikeEntity;
import org.apache.polaris.core.entity.TaskEntity;
Expand All @@ -38,7 +38,7 @@
import org.apache.polaris.service.catalog.io.FileIOFactory;

@ApplicationScoped
public class TaskFileIOSupplier implements BiFunction<TaskEntity, RealmContext, FileIO> {
public class TaskFileIOSupplier implements BiFunction<TaskEntity, CallContext, FileIO> {
private final FileIOFactory fileIOFactory;

@Inject
Expand All @@ -47,7 +47,7 @@ public TaskFileIOSupplier(FileIOFactory fileIOFactory) {
}

@Override
public FileIO apply(TaskEntity task, RealmContext realmContext) {
public FileIO apply(TaskEntity task, CallContext callContext) {
Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
Map<String, String> properties = new HashMap<>(internalProperties);

Expand All @@ -65,6 +65,6 @@ public FileIO apply(TaskEntity task, RealmContext realmContext) {
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO");

return fileIOFactory.loadFileIO(
realmContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testLoadFileIOForCleanupTask() {
Assertions.assertThat(tasks).hasSize(1);
TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
FileIO fileIO =
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, realmContext);
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, callContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);

// 1. BasePolarisCatalog:doCommit: for writing the table during the creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
Expand Down Expand Up @@ -64,7 +64,7 @@ public MeasuredFileIOFactory(

@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
Expand All @@ -79,7 +79,7 @@ public FileIO loadFileIO(
MeasuredFileIO wrapped =
new MeasuredFileIO(
defaultFileIOFactory.loadFileIO(
realmContext,
callContext,
ioImplClassName,
properties,
identifier,
Expand Down