From 863536ed50fe80701c0b7931ba5ecbe5e64987fa Mon Sep 17 00:00:00 2001 From: Dennis Huo Date: Mon, 24 Feb 2025 05:29:05 +0000 Subject: [PATCH] Remove PolarisMetaStoreSession from FileIOFactory/FileIOUtil in favor of CallContext This appeared to be some leaky divergence that occurred after CallContext had been removed, but PolarisMetaStoreSession really is only a low-level implementation detail that should never be handled by BasePolarisCatalog/FileIOFactory. This plumbs CallContext explicitly into the FileIOFactory and FileIOUtil methods and thus removes a large source of CallContext.getCurrentContext calls; now the threadlocal doesn't have to be set at all in BasePolarisCatalogTest. --- .../catalog/BasePolarisCatalogTest.java | 21 +++++++------------ .../ManifestFileCleanupTaskHandlerTest.java | 2 +- .../task/TableCleanupTaskHandlerTest.java | 2 +- .../service/catalog/BasePolarisCatalog.java | 13 +++--------- .../catalog/io/DefaultFileIOFactory.java | 7 ++++--- .../service/catalog/io/FileIOFactory.java | 6 +++--- .../service/catalog/io/FileIOUtil.java | 6 +----- .../io/WasbTranslatingFileIOFactory.java | 6 +++--- .../task/ManifestFileCleanupTaskHandler.java | 2 +- .../service/task/TableCleanupTaskHandler.java | 2 +- .../service/task/TaskFileIOSupplier.java | 8 +++---- .../service/catalog/io/FileIOFactoryTest.java | 2 +- .../catalog/io/MeasuredFileIOFactory.java | 6 +++--- 13 files changed, 33 insertions(+), 50 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java index 606f70afb0..370ede6623 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java @@ -161,6 +161,7 @@ public Map getConfigOverrides() { @Inject PolarisDiagnostics diagServices; private BasePolarisCatalog catalog; + private CallContext callContext; private AwsStorageConfigInfo storageConfigModel; private StsClient stsClient; private String realmName; @@ -199,8 +200,7 @@ public void before(TestInfo testInfo) { new PolarisEntityManager( metaStoreManager, new StorageCredentialCache(), new EntityCache(metaStoreManager)); - CallContext callContext = CallContext.of(realmContext, polarisContext); - CallContext.setCurrentContext(callContext); + callContext = CallContext.of(realmContext, polarisContext); PrincipalEntity rootEntity = new PrincipalEntity( @@ -527,7 +527,7 @@ public void testValidateNotificationFailToCreateFileIO() { final String tableMetadataLocation = tableLocation + "metadata/"; PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( - CallContext.getCurrentContext(), entityManager, securityContext, catalog().name()); + callContext, entityManager, securityContext, catalog().name()); FileIOFactory fileIOFactory = spy( new DefaultFileIOFactory( @@ -538,7 +538,7 @@ public void testValidateNotificationFailToCreateFileIO() { new BasePolarisCatalog( entityManager, metaStoreManager, - CallContext.getCurrentContext(), + callContext, passthroughView, securityContext, Mockito.mock(TaskExecutor.class), @@ -854,7 +854,6 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() { .setName(catalogWithoutStorage) .build()); - CallContext callContext = CallContext.getCurrentContext(); PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( callContext, entityManager, securityContext, catalogWithoutStorage); @@ -919,7 +918,6 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() { .setName(catalogName) .build()); - CallContext callContext = CallContext.getCurrentContext(); PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( callContext, entityManager, securityContext, catalogName); @@ -1434,7 +1432,7 @@ public void testDropTableWithPurge() { new RealmEntityManagerFactory(metaStoreManagerFactory), metaStoreManagerFactory, configurationStore)) - .apply(taskEntity, () -> realmName); + .apply(taskEntity, callContext); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class); } @@ -1461,8 +1459,6 @@ public void testDropTableWithPurgeDisabled() { .addProperty(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "false") .setStorageConfigurationInfo(noPurgeStorageConfigModel, storageLocation) .build()); - RealmContext realmContext = () -> "realm"; - CallContext callContext = CallContext.of(realmContext, polarisContext); PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( callContext, entityManager, securityContext, noPurgeCatalogName); @@ -1542,9 +1538,6 @@ public void testRetriableException() { @Test public void testFileIOWrapper() { - RealmContext realmContext = () -> "realm"; - CallContext callContext = CallContext.of(realmContext, polarisContext); - CallContext.setCurrentContext(callContext); PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( callContext, entityManager, securityContext, CATALOG_NAME); @@ -1600,7 +1593,7 @@ public void testFileIOWrapper() { new FileIOFactory() { @Override public FileIO loadFileIO( - @NotNull RealmContext realmContext, + @NotNull CallContext callContext, @NotNull String ioImplClassName, @NotNull Map properties, @NotNull TableIdentifier identifier, @@ -1608,7 +1601,7 @@ public FileIO loadFileIO( @NotNull Set storageActions, @NotNull PolarisResolvedPathWrapper resolvedEntityPath) { return measured.loadFileIO( - realmContext, + callContext, "org.apache.iceberg.inmemory.InMemoryFileIO", Map.of(), TABLE, diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java index 16d5222769..b6c03ef8ca 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java @@ -76,7 +76,7 @@ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) { new FileIOFactory() { @Override public FileIO loadFileIO( - @NotNull RealmContext realmContext, + @NotNull CallContext callContext, @NotNull String ioImplClassName, @NotNull Map properties, @NotNull TableIdentifier identifier, diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 9680bd1a2c..e107eb830e 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -75,7 +75,7 @@ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) { new FileIOFactory() { @Override public FileIO loadFileIO( - @Nonnull RealmContext realmContext, + @Nonnull CallContext callContext, @Nonnull String ioImplClassName, @Nonnull Map properties, @Nonnull TableIdentifier identifier, diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index dbee4ca34e..c42bb7fd61 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -836,10 +836,9 @@ public Map getCredentialConfig( return Map.of(); } return FileIOUtil.refreshCredentials( - callContext.getRealmContext(), + callContext, entityManager, getCredentialVendor(), - callContext.getPolarisCallContext().getMetaStore(), callContext.getPolarisCallContext().getConfigurationStore(), tableIdentifier, getLocationsAllowedToBeAccessed(tableMetadata), @@ -1614,7 +1613,7 @@ private FileIO loadFileIOForTableLike( // Reload fileIO based on table specific context FileIO fileIO = fileIOFactory.loadFileIO( - callContext.getRealmContext(), + callContext, ioImplClassName, tableProperties, identifier, @@ -2077,13 +2076,7 @@ private FileIO loadFileIO(String ioImpl, Map properties) { new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity)); Set storageActions = Set.of(PolarisStorageActions.ALL); return fileIOFactory.loadFileIO( - callContext.getRealmContext(), - ioImpl, - properties, - identifier, - locations, - storageActions, - resolvedPath); + callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath); } private void blockedUserSpecifiedWriteLocation(Map properties) { diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index 0d8a65569c..2c9b928c94 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -32,6 +32,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; @@ -70,13 +71,14 @@ public DefaultFileIOFactory( @Override public FileIO loadFileIO( - @Nonnull RealmContext realmContext, + @Nonnull CallContext callContext, @Nonnull String ioImplClassName, @Nonnull Map properties, @Nonnull TableIdentifier identifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + RealmContext realmContext = callContext.getRealmContext(); PolarisEntityManager entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext); PolarisCredentialVendor credentialVendor = @@ -93,10 +95,9 @@ public FileIO loadFileIO( .map( storageInfo -> FileIOUtil.refreshCredentials( - realmContext, + callContext, entityManager, credentialVendor, - metaStoreSession, configurationStore, identifier, tableLocations, diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java index 451aaf716c..f3e0d6b98b 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java @@ -24,7 +24,7 @@ import java.util.Set; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -41,7 +41,7 @@ public interface FileIOFactory { *

This method may obtain subscoped credentials to restrict the FileIO's permissions, ensuring * secure and limited access to the table's data and locations. * - * @param realmContext the realm for which the FileIO is being loaded. + * @param callContext the call for which the FileIO is being loaded. * @param ioImplClassName the class name of the FileIO implementation to load. * @param properties configuration properties for the FileIO. * @param identifier the table identifier. @@ -51,7 +51,7 @@ public interface FileIOFactory { * @return a configured FileIO instance. */ FileIO loadFileIO( - @Nonnull RealmContext realmContext, + @Nonnull CallContext callContext, @Nonnull String ioImplClassName, @Nonnull Map properties, @Nonnull TableIdentifier identifier, diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java index 59242fca6c..e0bed634ff 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java @@ -25,11 +25,9 @@ import org.apache.polaris.core.PolarisConfiguration; import org.apache.polaris.core.PolarisConfigurationStore; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.persistence.PolarisEntityManager; -import org.apache.polaris.core.persistence.PolarisMetaStoreSession; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisCredentialVendor; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -78,16 +76,14 @@ public static Optional findStorageInfoFromHierarchy( * */ public static Map refreshCredentials( - RealmContext realmContext, + CallContext callContext, PolarisEntityManager entityManager, PolarisCredentialVendor credentialVendor, - PolarisMetaStoreSession metaStoreSession, PolarisConfigurationStore configurationStore, TableIdentifier tableIdentifier, Set tableLocations, Set storageActions, PolarisEntity entity) { - CallContext callContext = CallContext.getCurrentContext(); boolean skipCredentialSubscopingIndirection = configurationStore.getConfiguration( diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java index c98dd4e278..3bb365368d 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java @@ -27,7 +27,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.PolarisConfigurationStore; -import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -52,7 +52,7 @@ public WasbTranslatingFileIOFactory( @Override public FileIO loadFileIO( - @Nonnull RealmContext realmContext, + @Nonnull CallContext callContext, @Nonnull String ioImplClassName, @Nonnull Map properties, @Nonnull TableIdentifier identifier, @@ -61,7 +61,7 @@ public FileIO loadFileIO( @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { return new WasbTranslatingFileIO( defaultFileIOFactory.loadFileIO( - realmContext, + callContext, ioImplClassName, properties, identifier, diff --git a/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java index bf6f084a01..9179e0f336 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java @@ -73,7 +73,7 @@ public boolean canHandleTask(TaskEntity task) { public boolean handleTask(TaskEntity task, CallContext callContext) { ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class); TableIdentifier tableId = cleanupTask.getTableId(); - try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext.getRealmContext())) { + try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) { if (task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP) { ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index 1f37c58062..3e5dd23f58 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -89,7 +89,7 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { // It's likely the cleanupTask has already been completed, but wasn't dropped successfully. // Log a // warning and move on - try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext.getRealmContext())) { + try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) { if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) { LOGGER .atWarn() diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java index 7ce7b52906..679d48060b 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java @@ -28,7 +28,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.PolarisTaskConstants; import org.apache.polaris.core.entity.TableLikeEntity; import org.apache.polaris.core.entity.TaskEntity; @@ -38,7 +38,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; @ApplicationScoped -public class TaskFileIOSupplier implements BiFunction { +public class TaskFileIOSupplier implements BiFunction { private final FileIOFactory fileIOFactory; @Inject @@ -47,7 +47,7 @@ public TaskFileIOSupplier(FileIOFactory fileIOFactory) { } @Override - public FileIO apply(TaskEntity task, RealmContext realmContext) { + public FileIO apply(TaskEntity task, CallContext callContext) { Map internalProperties = task.getInternalPropertiesAsMap(); Map properties = new HashMap<>(internalProperties); @@ -65,6 +65,6 @@ public FileIO apply(TaskEntity task, RealmContext realmContext) { CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO"); return fileIOFactory.loadFileIO( - realmContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath); + callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath); } } diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index b3eb25eb0a..6c808e77e9 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -189,7 +189,7 @@ public void testLoadFileIOForCleanupTask() { Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); FileIO fileIO = - new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, realmContext); + new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, callContext); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class); // 1. BasePolarisCatalog:doCommit: for writing the table during the creation diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java index 943967c9c9..3e831361ff 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java @@ -30,7 +30,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.PolarisConfigurationStore; -import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -64,7 +64,7 @@ public MeasuredFileIOFactory( @Override public FileIO loadFileIO( - @Nonnull RealmContext realmContext, + @Nonnull CallContext callContext, @Nonnull String ioImplClassName, @Nonnull Map properties, @Nonnull TableIdentifier identifier, @@ -79,7 +79,7 @@ public FileIO loadFileIO( MeasuredFileIO wrapped = new MeasuredFileIO( defaultFileIOFactory.loadFileIO( - realmContext, + callContext, ioImplClassName, properties, identifier,